diff options
author | Tim Watson <tim@rabbitmq.com> | 2014-01-27 16:50:00 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2014-01-27 16:50:00 +0000 |
commit | d079a8987c06c98616fcdb18360beba879d9c767 (patch) | |
tree | 7e76674a2236e5a1afdfd6d8c856338e916e7d68 | |
parent | fe0851e4573f2abe13c847e2f3c3f36c3f307d39 (diff) | |
download | rabbitmq-server-d079a8987c06c98616fcdb18360beba879d9c767.tar.gz |
Introduce a new parallel (multi-server) call API
-rw-r--r-- | src/gen_server2.erl | 53 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 23 |
2 files changed, 76 insertions, 0 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..57234c92 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -190,6 +190,7 @@ cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, + mcall/1, with_state/2, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). @@ -389,6 +390,58 @@ multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). +%%% ----------------------------------------------------------------- +%%% Make multiple calls to multiple servers, given pairs of servers +%%% and messages. +%%% Returns: {[{Pid, Reply}], [{Pid, Error}]} +%%% +%%% A middleman process is used to avoid clogging up the callers +%%% message queue. +%%% ----------------------------------------------------------------- +mcall(CallSpecs) -> + {Receiver, MRef} = spawn_monitor( + fun() -> + Refs = lists:foldl(fun do_mcall/2, dict:new(), + CallSpecs), + collect_replies(Refs, [], []) + end), + receive + {'DOWN', MRef, _, _, {Receiver, Result}} -> Result; + {'DOWN', MRef, _, _, Reason} -> exit(Reason) + end. + +do_mcall({Pid, Request}, Dict) -> + MRef = erlang:monitor(process, Pid), + catch erlang:send(Pid, {'$gen_call', {self(), MRef}, Request}, + [noconnect]), + dict:store(MRef, Pid, Dict). + +collect_replies(Refs, Replies, Errors) -> + case dict:size(Refs) of + 0 -> exit({self(), {Replies, Errors}}); + _ -> receive + {MRef, Reply} -> + {Refs1, Replies1} = handle_call_result(MRef, Reply, + Refs, Replies), + collect_replies(Refs1, Replies1, Errors); + {'DOWN', MRef, _, _, Reason} -> + Reason1 = case Reason of + noconnection -> nodedown; + _ -> Reason + end, + {Refs1, Errors1} = handle_call_result(MRef, Reason1, + Refs, Errors), + collect_replies(Refs1, Replies, Errors1) + end + end. + +handle_call_result(MRef, Result, Refs, AccList) -> + %% we use fetch instead of find, because we *do* want to crash if some + %% unexpected monitor signal arrives in our inbox! + Pid = dict:fetch(MRef, Refs), + erlang:demonitor(MRef, [flush]), + {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}. + %% ----------------------------------------------------------------- %% Apply a function to a generic server's state. %% ----------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2d6ff73b..a5e91f67 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -66,6 +66,7 @@ all_tests() -> passed = test_amqp_connection_refusal(), passed = test_confirms(), passed = test_with_state(), + passed = test_mcall(), passed = do_if_secondary_node( fun run_cluster_dependent_tests/1, @@ -1368,6 +1369,28 @@ test_with_state() -> fun (S) -> element(1, S) end), passed. +test_mcall() -> + Pids = [spawn_link(fun gs2_test_listener/0) || _ <- lists:seq(1, 250)], + BadPids = [spawn(fun gs2_test_crasher/0) || _ <- lists:seq(1, 10)], + {Replies, Errors} = gen_server2:mcall([{P, hello} || P <- Pids ++ BadPids]), + true = lists:sort(Replies) == lists:sort([{Pid, goodbye} || Pid <- Pids]), + true = lists:sort(Errors) == lists:sort([{Pid, boom} || Pid <- BadPids]), + passed. + +gs2_test_crasher() -> + receive + {'$gen_call', _From, hello} -> exit(boom) + end. + +gs2_test_listener() -> + receive + {'$gen_call', From, hello} -> + gen_server2:reply(From, goodbye), + gs2_test_listener(); + Other -> + exit(Other) + end. + test_statistics_event_receiver(Pid) -> receive Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) |