diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-03-14 17:10:58 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-03-14 17:10:58 +0000 |
commit | 00f507239680640c8f858456cce6942a15de8209 (patch) | |
tree | 4d9a97af40c5ef6a1ed7a8bee81bd4dff99764b4 | |
parent | 8dd3a08a840c85e80891f2c46d486ac26d348d95 (diff) | |
download | rabbitmq-server-00f507239680640c8f858456cce6942a15de8209.tar.gz |
Scatter-gather.
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 28 |
2 files changed, 33 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 43d65aea..977d302f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -433,10 +433,11 @@ force_event_refresh([]) -> force_event_refresh(QNames) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - Results = [catch gen_server2:call(Q#amqqueue.pid, force_event_refresh) || - Q <- Qs], - Failed = [QName || {QName, {'EXIT', _}} <- lists:zip(QNames, Results)], - io:format("Failed: ~p~n", [Failed]), + {_, Bad} = rabbit_misc:multi_call( + [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + FailedPids = [Pid || {Pid, _Reason} <- Bad], + Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, + lists:member(Pid, FailedPids)], timer:sleep(100), force_event_refresh(Failed), ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dca3bead..196d6da0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -58,6 +58,7 @@ -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). +-export([multi_call/2]). -export([quit/1]). %%---------------------------------------------------------------------------- @@ -200,6 +201,8 @@ -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). +-spec(multi_call/2 :: + ([pid()], any()) -> [{[{pid(), any()}], [{pid(), any()}]}]). -spec(quit/1 :: (integer() | string()) -> no_return()). -endif. @@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). +%% A simplified version of gen_server:multi_call/2 with a sane +%% API. This is not in gen_server2 as there is no useful +%% infrastructure there to share. +multi_call(Pids, Req) -> + MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], + receive_multi_call(MonitorPids, [], []). + +start_multi_call(Pid, Req) -> + Mref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Mref}, Req}, + {Mref, Pid}. + +receive_multi_call([], Good, Bad) -> + {Good, Bad}; +receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); + {'DOWN', Mref, _, _, noconnection} -> + receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); + {'DOWN', Mref, _, _, Reason} -> + receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) + end. + %% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of |