diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-22 16:41:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-22 16:41:43 +0100 |
commit | 38848c3c625f000e92352f4996a8e45c6806f2b1 (patch) | |
tree | e945709c36751dfdfdefc58299ae28777ccb25ba /src/delegate.erl | |
parent | 44ecde08790dcdc2f9fee001f728394cbb72c119 (diff) | |
download | rabbitmq-server-38848c3c625f000e92352f4996a8e45c6806f2b1.tar.gz |
Use 2x CPUs as the number of delegate processes.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 2724736e..03dd06ac 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -30,14 +30,14 @@ %% -module(delegate). --include("delegate.hrl"). +-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). -behaviour(gen_server2). -export([start_link/1, cast/2, call/2, gs2_call/3, gs2_pcall/4, gs2_cast/2, gs2_pcast/3, - server/1]). + server/1, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -76,7 +76,7 @@ call(Pids, FPid) when is_list(Pids) -> call_per_node(split_delegate_per_node(Pids), FPid). internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). + gen_server2:call({server(Node), Node}, {thunk, Thunk}, infinity). cast(Pid, FPid) when is_pid(Pid) -> @@ -88,7 +88,7 @@ cast(Pids, FPid) when is_list(Pids) -> ok. internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(), Node}, {thunk, Thunk}). + gen_server2:cast({server(Node), Node}, {thunk, Thunk}). %%---------------------------------------------------------------------------- @@ -120,8 +120,8 @@ delegate_per_node(NodePids, FPid, DelegateFun) -> [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || {Node, Pids} <- NodePids]). -server() -> - server(erlang:phash2(self(), ?DELEGATE_PROCESSES)). +server(Node) when is_atom(Node) -> + server(erlang:phash2(self(), process_count(Node))); server(Hash) -> list_to_atom("delegate_process_" ++ integer_to_list(Hash)). @@ -134,6 +134,18 @@ safe_invoke(FPid, Pid) -> {ok, Result, Pid} end. +process_count(Node) -> + case get({process_count, Node}) of + undefined -> + Count = rpc:call(Node, delegate, process_count, []), + put({process_count, Node}, Count), + Count; + Count -> Count + end. + +process_count() -> + ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). + %%-------------------------------------------------------------------- init([]) -> |