diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-30 17:35:01 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-30 17:35:01 +0100 |
commit | aeab61b1e893304ccc576bf7c4d4c90e4e9742be (patch) | |
tree | 1a642fb94690068a0b4f76f96ff4b3249a596d91 /src/delegate.erl | |
parent | 4cddf2bf0efdd35a96d6002023a172be4d1dc7b8 (diff) | |
download | rabbitmq-server-aeab61b1e893304ccc576bf7c4d4c90e4e9742be.tar.gz |
Cosmetics and minor refactorings
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index c9826f0d..c4ff764d 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -53,9 +53,13 @@ %%---------------------------------------------------------------------------- +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + start_link(Hash) -> - gen_server2:start_link({local, server(Hash)}, - ?MODULE, [], []). + gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, FPid) when is_pid(Pid) -> [{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], FPid), @@ -96,11 +100,11 @@ invoke_per_node(NodePids, FPid) -> lists:append(delegate_per_node(NodePids, FPid, fun internal_call/2)). invoke_no_result_per_node([{Node, Pids}], FPid) when Node == node() -> - % This is not actually async! However, in practice FPid will always be - % something that does a gen_server:cast or similar, so I don't think - % it's a problem unless someone misuses this function. Making this - % *actually* async would be painful as we can't spawn at this point or we - % break effect ordering. + %% This is not actually async! However, in practice FPid will + %% always be something that does a gen_server:cast or similar, so + %% I don't think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. local_delegate(Pids, FPid); invoke_no_result_per_node(NodePids, FPid) -> delegate_per_node(NodePids, FPid, fun internal_cast/2), @@ -111,23 +115,22 @@ local_delegate(Pids, FPid) -> delegate_per_node(NodePids, FPid, DelegateFun) -> Self = self(), - [gen_server2:cast(local_server(Node), {thunk, fun() -> - Self ! {result, DelegateFun(Node, - fun() -> local_delegate(Pids, FPid) end)} - end}) || {Node, Pids} <- NodePids], - gather_results([], length(NodePids)). - -gather_results(ResultsAcc, 0) -> - ResultsAcc; -gather_results(ResultsAcc, ToGo) -> - receive - {result, Result} -> gather_results([Result | ResultsAcc], ToGo - 1) - end. + %% Note that this is unsafe if the FPid requires reentrancy to the + %% local_server. I.e. if self() == local_server(Node) then we'll + %% block forever. + [gen_server2:cast( + local_server(Node), + {thunk, fun() -> Self ! + {result, + DelegateFun( + Node, fun() -> local_delegate(Pids, FPid) end)} + end}) || {Node, Pids} <- NodePids], + [receive {result, Result} -> Result end || _ <- NodePids]. local_server(Node) -> case get({delegate_local_server_name, Node}) of undefined -> - Name = server(erlang:phash2(Node, process_count())), + Name = server(erlang:phash2({self(), Node}, process_count())), put({delegate_local_server_name, Node}, Name), Name; Name -> Name @@ -138,11 +141,11 @@ remote_server(Node) -> undefined -> case rpc:call(Node, delegate, process_count, []) of {badrpc, _} -> - delegate_process_1; % Have to return something, if we're - % just casting then we don't want to - % blow up + %% Have to return something, if we're just casting + %% then we don't want to blow up + server(1); Count -> - Name = server(erlang:phash2(self(), Count)), + Name = server(erlang:phash2({self(), Node}, Count)), put({delegate_remote_server_name, Node}, Name), Name end; @@ -153,14 +156,12 @@ server(Hash) -> list_to_atom("delegate_process_" ++ integer_to_list(Hash)). safe_invoke(FPid, Pid) -> - % We need the catch here for the local case. In the remote case there will - % already have been a catch in handle_ca{ll,st} below, but that's OK, catch - % is idempotent. + %% We need the catch here for the local case. In the remote case + %% there will already have been a catch in handle_ca{ll,st} below, + %% but that's OK, catch is idempotent. case catch FPid(Pid) of - {'EXIT', Reason} -> - {error, {'EXIT', Reason}, Pid}; - Result -> - {ok, Result, Pid} + {'EXIT', Reason} -> {error, {'EXIT', Reason}, Pid}; + Result -> {ok, Result, Pid} end. process_count() -> @@ -169,14 +170,15 @@ process_count() -> %%-------------------------------------------------------------------- init([]) -> - {ok, no_state}. + {ok, no_state, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({thunk, Thunk}, _From, State) -> - {reply, catch Thunk(), State}. + {reply, catch Thunk(), State, hibernate}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), - {noreply, State}. + {noreply, State, hibernate}. handle_info(_Info, State) -> {noreply, State}. |