diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-29 15:51:24 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-29 15:51:24 +0100 |
commit | 84f04efe9d36f101e79414a2ebb992f1d3bcd4ba (patch) | |
tree | 0b37e99032369d9ef45c9ce722ab4f1b07fd1d71 | |
parent | bd1e95edc964c8c9954e6e254d7a3395039e1c75 (diff) | |
download | rabbitmq-server-84f04efe9d36f101e79414a2ebb992f1d3bcd4ba.tar.gz |
Commit 0ab19fce3677 broke effect visibility again! This time, go through a predictable process on the local side.
-rw-r--r-- | src/delegate.erl | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 6dd9d397..8597595f 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -47,7 +47,6 @@ -spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). --spec(server/1 :: (node() | non_neg_integer()) -> atom()). -spec(process_count/0 :: () -> non_neg_integer()). -endif. @@ -76,10 +75,10 @@ invoke_async(Pids, FPid) when is_list(Pids) -> %%---------------------------------------------------------------------------- internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity). + gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(Node), Node}, {thunk, Thunk}). + gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> orddict:to_list( @@ -112,11 +111,10 @@ local_delegate(Pids, FPid) -> delegate_per_node(NodePids, FPid, DelegateFun) -> Self = self(), - [spawn( - fun() -> Self ! {result, - DelegateFun(Node, + [gen_server2:cast(local_server(Node), {thunk, fun() -> + Self ! {result, DelegateFun(Node, fun() -> local_delegate(Pids, FPid) end)} - end) || {Node, Pids} <- NodePids], + end}) || {Node, Pids} <- NodePids], gather_results([], length(NodePids)). gather_results(ResultsAcc, 0) -> @@ -126,8 +124,17 @@ gather_results(ResultsAcc, ToGo) -> {result, Result} -> gather_results([Result | ResultsAcc], ToGo - 1) end. -server(Node) when is_atom(Node) -> - case get({delegate_server_name, Node}) of +local_server(Node) -> + case get({delegate_local_server_name, Node}) of + undefined -> + Name = server(erlang:phash2(Node, process_count())), + put({delegate_local_server_name, Node}, Name), + Name; + Name -> Name + end. + +remote_server(Node) -> + case get({delegate_remote_server_name, Node}) of undefined -> case rpc:call(Node, delegate, process_count, []) of {badrpc, _} -> @@ -136,11 +143,11 @@ server(Node) when is_atom(Node) -> % blow up Count -> Name = server(erlang:phash2(self(), Count)), - put({delegate_server_name, Node}, Name), + put({delegate_remote_server_name, Node}, Name), Name end; Name -> Name - end; + end. server(Hash) -> list_to_atom("delegate_process_" ++ integer_to_list(Hash)). |