diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-28 12:38:27 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-28 12:38:27 +0100 |
commit | ed59c8af0c29c90c8310c621555965c88d534d10 (patch) | |
tree | 39c0ad24d91d8305c85077174f84be4095cb6061 /src/delegate.erl | |
parent | 0a60ed986a73c267a7c09a2297dc4329e6fbfa88 (diff) | |
download | rabbitmq-server-ed59c8af0c29c90c8310c621555965c88d534d10.tar.gz |
Parallelise communication with multiple nodes.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 71287496..88abb20b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,8 +44,6 @@ -ifdef(use_specs). --type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()). - -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). -spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). @@ -97,20 +95,32 @@ split_delegate_per_node(Pids) -> invoke_per_node([{Node, Pids}], FPid) when Node == node() -> local_delegate(Pids, FPid); invoke_per_node(NodePids, FPid) -> - delegate_per_node(NodePids, FPid, fun internal_call/2). + lists:append(delegate_per_node(NodePids, FPid, fun internal_call/2)). invoke_async_per_node([{Node, Pids}], FPid) when Node == node() -> local_delegate(Pids, FPid); invoke_async_per_node(NodePids, FPid) -> - delegate_per_node(NodePids, FPid, fun internal_cast/2). + delegate_per_node(NodePids, FPid, fun internal_cast/2), + ok. local_delegate(Pids, FPid) -> [safe_invoke(FPid, Pid) || Pid <- Pids]. delegate_per_node(NodePids, FPid, DelegateFun) -> - lists:flatten( - [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) - || {Node, Pids} <- NodePids]). + Self = self(), + [spawn(fun() -> + Self ! {result, DelegateFun(Node, + fun() -> local_delegate(Pids, FPid) end)} + end) || {Node, Pids} <- NodePids], + gather_results([], length(NodePids)). + +gather_results(ResultsAcc, 0) -> + ResultsAcc; + +gather_results(ResultsAcc, ToGo) -> + receive {result, Result} -> + gather_results([Result | ResultsAcc], ToGo - 1) + end. server(Node) when is_atom(Node) -> case get({delegate_server_name, Node}) of |