summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-29 15:51:24 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-29 15:51:24 +0100
commit84f04efe9d36f101e79414a2ebb992f1d3bcd4ba (patch)
tree0b37e99032369d9ef45c9ce722ab4f1b07fd1d71
parentbd1e95edc964c8c9954e6e254d7a3395039e1c75 (diff)
downloadrabbitmq-server-84f04efe9d36f101e79414a2ebb992f1d3bcd4ba.tar.gz
Commit 0ab19fce3677 broke effect visibility again! This time, go through a predictable process on the local side.
-rw-r--r--src/delegate.erl29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 6dd9d397..8597595f 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -47,7 +47,6 @@
-spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
--spec(server/1 :: (node() | non_neg_integer()) -> atom()).
-spec(process_count/0 :: () -> non_neg_integer()).
-endif.
@@ -76,10 +75,10 @@ invoke_async(Pids, FPid) when is_list(Pids) ->
%%----------------------------------------------------------------------------
internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity).
+ gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({server(Node), Node}, {thunk, Thunk}).
+ gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
orddict:to_list(
@@ -112,11 +111,10 @@ local_delegate(Pids, FPid) ->
delegate_per_node(NodePids, FPid, DelegateFun) ->
Self = self(),
- [spawn(
- fun() -> Self ! {result,
- DelegateFun(Node,
+ [gen_server2:cast(local_server(Node), {thunk, fun() ->
+ Self ! {result, DelegateFun(Node,
fun() -> local_delegate(Pids, FPid) end)}
- end) || {Node, Pids} <- NodePids],
+ end}) || {Node, Pids} <- NodePids],
gather_results([], length(NodePids)).
gather_results(ResultsAcc, 0) ->
@@ -126,8 +124,17 @@ gather_results(ResultsAcc, ToGo) ->
{result, Result} -> gather_results([Result | ResultsAcc], ToGo - 1)
end.
-server(Node) when is_atom(Node) ->
- case get({delegate_server_name, Node}) of
+local_server(Node) ->
+ case get({delegate_local_server_name, Node}) of
+ undefined ->
+ Name = server(erlang:phash2(Node, process_count())),
+ put({delegate_local_server_name, Node}, Name),
+ Name;
+ Name -> Name
+ end.
+
+remote_server(Node) ->
+ case get({delegate_remote_server_name, Node}) of
undefined ->
case rpc:call(Node, delegate, process_count, []) of
{badrpc, _} ->
@@ -136,11 +143,11 @@ server(Node) when is_atom(Node) ->
% blow up
Count ->
Name = server(erlang:phash2(self(), Count)),
- put({delegate_server_name, Node}, Name),
+ put({delegate_remote_server_name, Node}, Name),
Name
end;
Name -> Name
- end;
+ end.
server(Hash) ->
list_to_atom("delegate_process_" ++ integer_to_list(Hash)).