summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-22 16:41:43 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-22 16:41:43 +0100
commit38848c3c625f000e92352f4996a8e45c6806f2b1 (patch)
treee945709c36751dfdfdefc58299ae28777ccb25ba /src/delegate.erl
parent44ecde08790dcdc2f9fee001f728394cbb72c119 (diff)
downloadrabbitmq-server-38848c3c625f000e92352f4996a8e45c6806f2b1.tar.gz
Use 2x CPUs as the number of delegate processes.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl24
1 files changed, 18 insertions, 6 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 2724736e..03dd06ac 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -30,14 +30,14 @@
%%
-module(delegate).
--include("delegate.hrl").
+-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-behaviour(gen_server2).
-export([start_link/1, cast/2, call/2,
gs2_call/3, gs2_pcall/4,
gs2_cast/2, gs2_pcast/3,
- server/1]).
+ server/1, process_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -76,7 +76,7 @@ call(Pids, FPid) when is_list(Pids) ->
call_per_node(split_delegate_per_node(Pids), FPid).
internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({server(), Node}, {thunk, Thunk}, infinity).
+ gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity).
cast(Pid, FPid) when is_pid(Pid) ->
@@ -88,7 +88,7 @@ cast(Pids, FPid) when is_list(Pids) ->
ok.
internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({server(), Node}, {thunk, Thunk}).
+ gen_server2:cast({server(Node), Node}, {thunk, Thunk}).
%%----------------------------------------------------------------------------
@@ -120,8 +120,8 @@ delegate_per_node(NodePids, FPid, DelegateFun) ->
[DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end)
|| {Node, Pids} <- NodePids]).
-server() ->
- server(erlang:phash2(self(), ?DELEGATE_PROCESSES)).
+server(Node) when is_atom(Node) ->
+ server(erlang:phash2(self(), process_count(Node)));
server(Hash) ->
list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
@@ -134,6 +134,18 @@ safe_invoke(FPid, Pid) ->
{ok, Result, Pid}
end.
+process_count(Node) ->
+ case get({process_count, Node}) of
+ undefined ->
+ Count = rpc:call(Node, delegate, process_count, []),
+ put({process_count, Node}, Count),
+ Count;
+ Count -> Count
+ end.
+
+process_count() ->
+ ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
+
%%--------------------------------------------------------------------
init([]) ->