diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-10-25 16:35:22 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-10-25 16:35:22 +0100 |
commit | dbea323eaf57a5a4408e1fb237e6e381d3df79bd (patch) | |
tree | e5140782d41fe2892d72dc29d0333f810ca4fce3 /src/delegate.erl | |
parent | 2857a50b6c63d1187dad2959c322e05958226336 (diff) | |
download | rabbitmq-server-dbea323eaf57a5a4408e1fb237e6e381d3df79bd.tar.gz |
Use two pools of delegates, one for each of outgoing and incoming messages. Prevents deadlocks.bug23429
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index c8aa3092..e50b99f1 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,7 +44,8 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/2 :: + (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). @@ -60,8 +61,8 @@ %%---------------------------------------------------------------------------- -start_link(Hash) -> - gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +start_link(Prefix, Hash) -> + gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), @@ -147,7 +148,8 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> local_server(Node) -> case get({delegate_local_server_name, Node}) of undefined -> - Name = server(erlang:phash2({self(), Node}, process_count())), + Name = server(outgoing, + erlang:phash2({self(), Node}, process_count())), put({delegate_local_server_name, Node}, Name), Name; Name -> Name @@ -160,17 +162,20 @@ remote_server(Node) -> {badrpc, _} -> %% Have to return something, if we're just casting %% then we don't want to blow up - server(1); + server(incoming, 1); Count -> - Name = server(erlang:phash2({self(), Node}, Count)), + Name = server(incoming, + erlang:phash2({self(), Node}, Count)), put({delegate_remote_server_name, Node}, Name), Name end; Name -> Name end. -server(Hash) -> - list_to_atom("delegate_process_" ++ integer_to_list(Hash)). +server(Prefix, Hash) -> + list_to_atom("delegate_" ++ + atom_to_list(Prefix) ++ "_" ++ + integer_to_list(Hash)). safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; |