diff options
-rw-r--r-- | src/delegate.erl | 27 | ||||
-rw-r--r-- | src/delegate_sup.erl | 2 |
2 files changed, 17 insertions, 12 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index ff55a15b..46bd8245 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,7 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). -endif. @@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) -> {Replies, BadNodes} = case orddict:fetch_keys(Grouped) of [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), - {invoke, Fun, Grouped}, - infinity) + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, @@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die @@ -111,17 +111,22 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count() -> - {ok, Count} = application:get_env(rabbit, delegate_count), +delegate_count([RemoteNode | _]) -> + {ok, Count} = case application:get_env(rabbit, delegate_count) of + undefined -> rpc:call(RemoteNode, application, get_env, + [rabbit, delegate_count]); + Result -> Result + end, Count. delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate() -> +delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(self(), delegate_count())), + undefined -> Name = + delegate_name(erlang:phash2( + self(), delegate_count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 52747221..e0ffa7c8 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -40,7 +40,7 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - DCount = delegate:delegate_count(), + DCount = delegate:delegate_count([node()]), {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || |