summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/delegate.erl27
-rw-r--r--src/delegate_sup.erl2
2 files changed, 17 insertions, 12 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index ff55a15b..46bd8245 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,7 +36,7 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-endif.
@@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) ->
{Replies, BadNodes} =
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
- {invoke, Fun, Grouped},
- infinity)
+ RemoteNodes -> gen_server2:multi_call(
+ RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
@@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
@@ -111,17 +111,22 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
+delegate_count([RemoteNode | _]) ->
+ {ok, Count} = case application:get_env(rabbit, delegate_count) of
+ undefined -> rpc:call(RemoteNode, application, get_env,
+ [rabbit, delegate_count]);
+ Result -> Result
+ end,
Count.
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate() ->
+delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name = delegate_name(
- erlang:phash2(self(), delegate_count())),
+ undefined -> Name =
+ delegate_name(erlang:phash2(
+ self(), delegate_count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 52747221..e0ffa7c8 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -40,7 +40,7 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- DCount = delegate:delegate_count(),
+ DCount = delegate:delegate_count([node()]),
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||