summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2014-01-27 16:50:00 +0000
committerTim Watson <tim@rabbitmq.com>2014-01-27 16:50:00 +0000
commitd079a8987c06c98616fcdb18360beba879d9c767 (patch)
tree7e76674a2236e5a1afdfd6d8c856338e916e7d68
parentfe0851e4573f2abe13c847e2f3c3f36c3f307d39 (diff)
downloadrabbitmq-server-d079a8987c06c98616fcdb18360beba879d9c767.tar.gz
Introduce a new parallel (multi-server) call API
-rw-r--r--src/gen_server2.erl53
-rw-r--r--src/rabbit_tests.erl23
2 files changed, 76 insertions, 0 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6690d181..57234c92 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -190,6 +190,7 @@
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
+ mcall/1,
with_state/2,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
@@ -389,6 +390,58 @@ multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
+%%% -----------------------------------------------------------------
+%%% Make multiple calls to multiple servers, given pairs of servers
+%%% and messages.
+%%% Returns: {[{Pid, Reply}], [{Pid, Error}]}
+%%%
+%%% A middleman process is used to avoid clogging up the callers
+%%% message queue.
+%%% -----------------------------------------------------------------
+mcall(CallSpecs) ->
+ {Receiver, MRef} = spawn_monitor(
+ fun() ->
+ Refs = lists:foldl(fun do_mcall/2, dict:new(),
+ CallSpecs),
+ collect_replies(Refs, [], [])
+ end),
+ receive
+ {'DOWN', MRef, _, _, {Receiver, Result}} -> Result;
+ {'DOWN', MRef, _, _, Reason} -> exit(Reason)
+ end.
+
+do_mcall({Pid, Request}, Dict) ->
+ MRef = erlang:monitor(process, Pid),
+ catch erlang:send(Pid, {'$gen_call', {self(), MRef}, Request},
+ [noconnect]),
+ dict:store(MRef, Pid, Dict).
+
+collect_replies(Refs, Replies, Errors) ->
+ case dict:size(Refs) of
+ 0 -> exit({self(), {Replies, Errors}});
+ _ -> receive
+ {MRef, Reply} ->
+ {Refs1, Replies1} = handle_call_result(MRef, Reply,
+ Refs, Replies),
+ collect_replies(Refs1, Replies1, Errors);
+ {'DOWN', MRef, _, _, Reason} ->
+ Reason1 = case Reason of
+ noconnection -> nodedown;
+ _ -> Reason
+ end,
+ {Refs1, Errors1} = handle_call_result(MRef, Reason1,
+ Refs, Errors),
+ collect_replies(Refs1, Replies, Errors1)
+ end
+ end.
+
+handle_call_result(MRef, Result, Refs, AccList) ->
+ %% we use fetch instead of find, because we *do* want to crash if some
+ %% unexpected monitor signal arrives in our inbox!
+ Pid = dict:fetch(MRef, Refs),
+ erlang:demonitor(MRef, [flush]),
+ {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}.
+
%% -----------------------------------------------------------------
%% Apply a function to a generic server's state.
%% -----------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2d6ff73b..a5e91f67 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -66,6 +66,7 @@ all_tests() ->
passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed = test_with_state(),
+ passed = test_mcall(),
passed =
do_if_secondary_node(
fun run_cluster_dependent_tests/1,
@@ -1368,6 +1369,28 @@ test_with_state() ->
fun (S) -> element(1, S) end),
passed.
+test_mcall() ->
+ Pids = [spawn_link(fun gs2_test_listener/0) || _ <- lists:seq(1, 250)],
+ BadPids = [spawn(fun gs2_test_crasher/0) || _ <- lists:seq(1, 10)],
+ {Replies, Errors} = gen_server2:mcall([{P, hello} || P <- Pids ++ BadPids]),
+ true = lists:sort(Replies) == lists:sort([{Pid, goodbye} || Pid <- Pids]),
+ true = lists:sort(Errors) == lists:sort([{Pid, boom} || Pid <- BadPids]),
+ passed.
+
+gs2_test_crasher() ->
+ receive
+ {'$gen_call', _From, hello} -> exit(boom)
+ end.
+
+gs2_test_listener() ->
+ receive
+ {'$gen_call', From, hello} ->
+ gen_server2:reply(From, goodbye),
+ gs2_test_listener();
+ Other ->
+ exit(Other)
+ end.
+
test_statistics_event_receiver(Pid) ->
receive
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)