diff options
author | Simon MacMullen <simon@lshift.net> | 2010-03-18 11:05:59 +0000 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-03-18 11:05:59 +0000 |
commit | 087c3b638825322414cad5954ccd6a6fc86ccbac (patch) | |
tree | dff5bbcdbd1b49fdf0db1b4c468b166e34ca0751 /src/rabbit_router.erl | |
parent | 97d4d5e8c4907af4f902a2200877c9611d69895e (diff) | |
download | rabbitmq-server-087c3b638825322414cad5954ccd6a6fc86ccbac.tar.gz |
Reimplement rabbit_router in terms of delegate.
Diffstat (limited to 'src/rabbit_router.erl')
-rw-r--r-- | src/rabbit_router.erl | 131 |
1 files changed, 16 insertions, 115 deletions
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab..a8cffbdf 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,104 +33,40 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --behaviour(gen_server2). - --export([start_link/0, - deliver/2, +-export([deliver/2, match_bindings/2, match_routing_key/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% cross-node routing optimisation is disabled because of bug 19758. --define(BUG19758, true). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - --ifdef(BUG19758). - -deliver(QPids, Delivery) -> - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)). - --else. - -deliver(QPids, Delivery) -> - %% we reduce inter-node traffic by grouping the qpids by node and - %% only delivering one copy of the message to each node involved, - %% which then in turn delivers it to its queues. - deliver_per_node( - dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), - Delivery). - -deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> - %% optimisation - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)); -deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver in run_bindings below will deliver the + %% rabbit_amqqueue:deliver will deliver the %% message to the queue process asynchronously, and return true, %% which means all the QPids will always be returned. It is %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {routed, - lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Delivery) -> - R = rabbit_misc:upmap( - fun ({Node, QPids}) -> - try gen_server2:call({?SERVER, Node}, - {deliver, QPids, Delivery}, - infinity) - catch - _Class:_Reason -> - %% TODO: figure out what to log (and do!) here - {false, []} - end - end, - NodeQPids), + delegate:delegate_async(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + {routed, QPids}; + +deliver(QPids, Delivery) -> + Res = delegate:delegate_sync(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) -> - {Routed or RoutedAcc, - %% we do the concatenation below, which - %% should be faster - [Handled | HandledAcc]} - end, - {false, []}, - R), + lists:foldl(fun fold_deliveries/2, {false, []}, Res), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, lists:append(Handled)}). - --endif. + {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -174,44 +110,9 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -init([]) -> - {ok, no_state}. - -handle_call({deliver, QPids, Delivery}, From, State) -> - spawn( - fun () -> - R = run_bindings(QPids, Delivery), - gen_server2:reply(From, R) - end), - {noreply, State}. - -handle_cast({deliver, QPids, Delivery}, State) -> - %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Delivery), - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -run_bindings(QPids, Delivery) -> - lists:foldl( - fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(QPid, Delivery) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', _Reason} -> {Routed, Handled} - end - end, - {false, []}, - QPids). +fold_deliveries({ok, true, Pid}, {_, Handled}) -> {true, [Pid | Handled]}; +fold_deliveries({ok, false, _}, {_, Handled}) -> {true, Handled}; +fold_deliveries({error, _, _}, {Routed, Handled}) -> {Routed, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; |