diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-27 13:18:37 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-27 13:18:37 +0100 |
commit | 58a35a817260cb41dec2236e52ec3e27a906cc9c (patch) | |
tree | d36a6ed3274a5719466a2661af9a7dd4d5b53c96 | |
parent | c0991485aa85e0211d16cac24b07d8fa1432acbe (diff) | |
parent | 305dc5866712547c982b4567774ce1b9910fb335 (diff) | |
download | rabbitmq-server-58a35a817260cb41dec2236e52ec3e27a906cc9c.tar.gz |
Merging default into bug 19844
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | src/delegate.erl | 194 | ||||
-rw-r--r-- | src/delegate_sup.erl | 63 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 74 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 1 | ||||
-rw-r--r-- | src/rabbit_router.erl | 127 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 113 |
9 files changed, 430 insertions, 164 deletions
@@ -167,6 +167,9 @@ start-cover: all echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL) echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) +start-secondary-cover: all + echo "rabbit_misc:start_cover([\"hare\"])." | $(ERL_CALL) + stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) cat cover/summary.txt diff --git a/src/delegate.erl b/src/delegate.erl new file mode 100644 index 00000000..f3c3f097 --- /dev/null +++ b/src/delegate.erl @@ -0,0 +1,194 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate). +-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). + +-behaviour(gen_server2). + +-export([start_link/1, cast/2, call/2, + gs2_call/3, gs2_pcall/4, + gs2_cast/2, gs2_pcast/3, + server/1, process_count/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()). + +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). +-spec(cast/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). +-spec(call/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). + +-spec(gs2_call/3 :: + (serverref(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_pcall/4 :: + (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_cast/2 :: (serverref(), any()) -> 'ok'). +-spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok'). + +-spec(server/1 :: (node() | non_neg_integer()) -> atom()). +-spec(process_count/0 :: () -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Hash) -> + gen_server2:start_link({local, server(Hash)}, + ?MODULE, [], []). + +gs2_call(Pid, Msg, Timeout) -> + {_Status, Res} = + call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), + Res. + +gs2_pcall(Pid, Pri, Msg, Timeout) -> + {_Status, Res} = + call(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), + Res. + +gs2_cast(Pid, Msg) -> + cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + +gs2_pcast(Pid, Pri, Msg) -> + cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + + +call(Pid, FPid) when is_pid(Pid) -> + [{Status, Res, _}] = call_per_node([{node(Pid), [Pid]}], FPid), + {Status, Res}; + +call(Pids, FPid) when is_list(Pids) -> + call_per_node(split_delegate_per_node(Pids), FPid). + +internal_call(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity). + + +cast(Pid, FPid) when is_pid(Pid) -> + cast_per_node([{node(Pid), [Pid]}], FPid), + ok; + +cast(Pids, FPid) when is_list(Pids) -> + cast_per_node(split_delegate_per_node(Pids), FPid), + ok. + +internal_cast(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(Node), Node}, {thunk, Thunk}). + +%%---------------------------------------------------------------------------- + +split_delegate_per_node(Pids) -> + orddict:to_list( + lists:foldl( + fun (Pid, D) -> + orddict:update(node(Pid), + fun (Pids1) -> [Pid | Pids1] end, + [Pid], D) + end, + orddict:new(), Pids)). + +call_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +call_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_call/2). + +cast_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +cast_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_cast/2). + +local_delegate(Pids, FPid) -> + [safe_invoke(FPid, Pid) || Pid <- Pids]. + +delegate_per_node(NodePids, FPid, DelegateFun) -> + lists:flatten( + [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) + || {Node, Pids} <- NodePids]). + +server(Node) when is_atom(Node) -> + server(erlang:phash2(self(), process_count(Node))); + +server(Hash) -> + list_to_atom("delegate_process_" ++ integer_to_list(Hash)). + +safe_invoke(FPid, Pid) -> + case catch FPid(Pid) of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}, Pid}; + Result -> + {ok, Result, Pid} + end. + +process_count(Node) -> + case get({process_count, Node}) of + undefined -> + case rpc:call(Node, delegate, process_count, []) of + {badrpc, _} -> + 1; % Have to return something, if we're just casting then + % we don't want to blow up + Count -> + put({process_count, Node}, Count), + Count + end; + Count -> Count + end. + +process_count() -> + ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state}. + +handle_call({thunk, Thunk}, _From, State) -> + {reply, Thunk(), State}. + +handle_cast({thunk, Thunk}, State) -> + catch Thunk(), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl new file mode 100644 index 00000000..1f351406 --- /dev/null +++ b/src/delegate_sup.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%-------------------------------------------------------------------- + +init(_Args) -> + {ok, {{one_for_one, 10, 10}, + [{delegate:server(Hash), {delegate, start_link, [Hash]}, + transient, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(0, delegate:process_count() - 1)]}}. + +%%-------------------------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index bbda29c9..40f38b3b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -99,10 +99,10 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_router, - [{description, "cluster router"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_router]}}, +-rabbit_boot_step({delegate_sup, + [{description, "cluster delegate"}, + {mfa, {rabbit_sup, start_child, + [delegate_sup]}}, {requires, kernel_ready}, {enables, core_initialized}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5f045b27..23a4932a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -91,7 +91,7 @@ -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). @@ -225,10 +225,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info, infinity). + delegate:gs2_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of + case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -238,7 +238,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, consumers, infinity). + delegate:gs2_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -247,15 +247,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> + delegate:gs2_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -270,25 +271,23 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). rollback_all(QPids, Txn, ChPid) -> - safe_pmap_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end). notify_down_all(QPids, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, @@ -296,38 +295,34 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). + delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {unblock, ChPid}). + delegate:gs2_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, - QPids). + delegate:cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). internal_delete(QueueName) -> case @@ -373,17 +368,14 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_pmap_ok(H, F, L) -> - case [R || R <- rabbit_misc:upmap( - fun (V) -> - try - rabbit_misc:with_exit_handler( - fun () -> H(V) end, - fun () -> F(V) end) - catch Class:Reason -> {Class, Reason} - end - end, L), - R =/= ok] of +safe_delegate_call_ok(H, F, Pids) -> + case [R || R = {error, _, _} <- delegate:call( + Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end)] of [] -> ok; Errors -> {error, Errors} end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7d3cd722..1f16ec08 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -944,13 +944,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey, self()) of - ok -> NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "rollback failed: ~w", [Errors]) - end. + ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey, self()), + NewUAMQ = queue:join(UAQ, UAMQ), + new_tx(State#ch{unacked_message_q = NewUAMQ}). rollback_and_notify(State = #ch{transaction_id = none}) -> notify_queues(State); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2c180846..119808af 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -219,6 +219,7 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). + enable_cover() -> enable_cover("."). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index a449e19e..6a886eac 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,100 +33,40 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --behaviour(gen_server2). - --export([start_link/0, - deliver/2, +-export([deliver/2, match_bindings/2, match_routing_key/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% cross-node routing optimisation is disabled because of bug 19758. --define(BUG19758, true). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - --ifdef(BUG19758). - -deliver(QPids, Delivery) -> - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)). - --else. - -deliver(QPids, Delivery) -> - %% we reduce inter-node traffic by grouping the qpids by node and - %% only delivering one copy of the message to each node involved, - %% which then in turn delivers it to its queues. - deliver_per_node( - dict:to_list( - lists:foldl(fun (QPid, D) -> - rabbit_misc:dict_cons(node(QPid), QPid, D) - end, dict:new(), QPids)), - Delivery). - -deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> - %% optimisation - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)); -deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver in run_bindings below will deliver the + %% rabbit_amqqueue:deliver will deliver the %% message to the queue process asynchronously, and return true, %% which means all the QPids will always be returned. It is %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {routed, - lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Delivery) -> - R = rabbit_misc:upmap( - fun ({Node, QPids}) -> - try gen_server2:call({?SERVER, Node}, - {deliver, QPids, Delivery}, - infinity) - catch - _Class:_Reason -> - %% TODO: figure out what to log (and do!) here - {false, []} - end - end, - NodeQPids), + delegate:cast(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + {routed, QPids}; + +deliver(QPids, Delivery) -> + Res = delegate:call(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) -> - {Routed or RoutedAcc, - %% we do the concatenation below, which - %% should be faster - [Handled | HandledAcc]} - end, - {false, []}, - R), + lists:foldl(fun fold_deliveries/2, {false, []}, Res), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, lists:append(Handled)}). - --endif. + {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -170,44 +110,9 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -init([]) -> - {ok, no_state}. - -handle_call({deliver, QPids, Delivery}, From, State) -> - spawn( - fun () -> - R = run_bindings(QPids, Delivery), - gen_server2:reply(From, R) - end), - {noreply, State}. - -handle_cast({deliver, QPids, Delivery}, State) -> - %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Delivery), - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -run_bindings(QPids, Delivery) -> - lists:foldl( - fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(QPid, Delivery) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', _Reason} -> {Routed, Handled} - end - end, - {false, []}, - QPids). +fold_deliveries({ok, true, Pid},{_, Handled}) -> {true, [Pid|Handled]}; +fold_deliveries({ok, false, _ },{_, Handled}) -> {true, Handled}; +fold_deliveries({error, _ , _ },{Routed, Handled}) -> {Routed, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d645d183..5ed7d64c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -61,7 +61,32 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), - passed = test_hooks(), + passed = maybe_run_cluster_dependent_tests(), + passed. + + +maybe_run_cluster_dependent_tests() -> + SecondaryNode = rabbit_misc:makenode("hare"), + + case net_adm:ping(SecondaryNode) of + pong -> passed = run_cluster_dependent_tests(SecondaryNode); + pang -> io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end, + passed. + +run_cluster_dependent_tests(SecondaryNode) -> + SecondaryNodeS = atom_to_list(SecondaryNode), + + ok = control_action(stop_app, []), + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + + io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), + passed = test_delegates_async(SecondaryNode), + passed = test_delegates_sync(SecondaryNode), + passed. test_priority_queue() -> @@ -815,6 +840,92 @@ test_hooks() -> end, passed. +test_delegates_async(SecondaryNode) -> + Self = self(), + Sender = fun(Pid) -> Pid ! {invoked, Self} end, + + Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), + + ok = delegate:cast(spawn(Responder), Sender), + ok = delegate:cast(spawn(SecondaryNode, Responder), Sender), + await_response(2), + + LocalPids = spawn_responders(node(), Responder, 10), + RemotePids = spawn_responders(SecondaryNode, Responder, 10), + ok = delegate:cast(LocalPids ++ RemotePids, Sender), + await_response(20), + + passed. + +make_responder(FMsg) -> + fun() -> + receive + Msg -> + FMsg(Msg) + after 100 -> + throw(timeout) + end + end. + +spawn_responders(Node, Responder, Count) -> + [spawn(Node, Responder) || _ <- lists:seq(1, Count)]. + +await_response(0) -> + ok; + +await_response(Count) -> + receive + response -> ok, + await_response(Count - 1) + after 100 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + +test_delegates_sync(SecondaryNode) -> + Sender = fun(Pid) -> + gen_server2:call(Pid, invoked) + end, + + Responder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server2:reply(From, response) + end), + + BadResponder = make_responder(fun({'$gen_call', _From, invoked}) -> + throw(exception) + end), + + {ok, response} = delegate:call(spawn(Responder), Sender), + {ok, response} = delegate:call(spawn(SecondaryNode, Responder), Sender), + + {error, _} = delegate:call(spawn(BadResponder), Sender), + {error, _} = delegate:call(spawn(SecondaryNode, BadResponder), Sender), + + LocalGoodPids = spawn_responders(node(), Responder, 2), + RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), + + GoodRes = delegate:call(LocalGoodPids ++ RemoteGoodPids, Sender), + [{ok, response, _}, {ok, response, _}, + {ok, response, _}, {ok, response, _}] = GoodRes, + + BadRes = delegate:call(LocalBadPids ++ RemoteBadPids, Sender), + [{error, _, _}, {error, _, _}, + {error, _, _}, {error, _, _}] = BadRes, + + GoodResPids = [Pid || {_, _, Pid} <- GoodRes], + BadResPids = [Pid || {_, _, Pid} <- BadRes], + + Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), + Good = ordsets:from_list(GoodResPids), + + Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), + Bad = ordsets:from_list(BadResPids), + + passed. + + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). |