diff options
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 209 |
1 files changed, 83 insertions, 126 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b..10054e57 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -31,11 +31,9 @@ -module(delegate). --define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). - -behaviour(gen_server2). --export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,13 +42,16 @@ -ifdef(use_specs). --spec(start_link/2 :: - (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). +-spec(invoke/2 :: + ( pid(), fun ((pid()) -> A)) -> A; + ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], + [{pid(), term()}]}). --spec(process_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/0 :: () -> non_neg_integer()). -endif. @@ -61,157 +62,113 @@ %%---------------------------------------------------------------------------- -start_link(Prefix, Hash) -> - gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). +start_link(Num) -> + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). +invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + Fun(Pid); invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), - case Res of - {ok, Result, _} -> + case invoke([Pid], Fun) of + {[{Pid, Result}], []} -> Result; - {error, {Class, Reason, StackTrace}, _} -> + {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; invoke(Pids, Fun) when is_list(Pids) -> - lists:foldl( - fun ({Status, Result, Pid}, {Good, Bad}) -> - case Status of - ok -> {[{Pid, Result}|Good], Bad}; - error -> {Good, [{Pid, Result}|Bad]} - end + {LocalPids, Grouped} = group_pids_by_node(Pids), + %% The use of multi_call is only safe because the timeout is + %% infinity, and thus there is no process spawned in order to do + %% the sending. Thus calls can't overtake preceding calls/casts. + {Replies, BadNodes} = + case orddict:fetch_keys(Grouped) of + [] -> {[], []}; + RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), + {invoke, Fun, Grouped}, + infinity) end, - {[], []}, - invoke_per_node(split_delegate_per_node(Pids), Fun)). + BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || + BadNode <- BadNodes, + Pid <- orddict:fetch(BadNode, Grouped)], + ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + [Results || {_Node, Results} <- Replies]]), + lists:foldl( + fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; + ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} + end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), +invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, Fun), %% we don't care about any error ok; +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result([Pid], Fun); invoke_no_result(Pids, Fun) when is_list(Pids) -> - invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + {LocalPids, Grouped} = group_pids_by_node(Pids), + case orddict:fetch_keys(Grouped) of + [] -> ok; + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + {invoke, Fun, Grouped}) + end, + safe_invoke(LocalPids, Fun), %% must not die ok. %%---------------------------------------------------------------------------- -internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). - -internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). - -split_delegate_per_node(Pids) -> +group_pids_by_node(Pids) -> LocalNode = node(), - {Local, Remote} = - lists:foldl( - fun (Pid, {L, D}) -> - Node = node(Pid), - case Node of - LocalNode -> {[Pid|L], D}; - _ -> {L, orddict:append(Node, Pid, D)} - end - end, - {[], orddict:new()}, Pids), - {Local, orddict:to_list(Remote)}. - -invoke_per_node(NodePids, Fun) -> - lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). - -invoke_no_result_per_node(NodePids, Fun) -> - delegate_per_node(NodePids, Fun, fun internal_cast/2), - ok. - -delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> - %% In the case where DelegateFun is internal_cast, the safe_invoke - %% is not actually async! However, in practice Fun will always be - %% something that does a gen_server:cast or similar, so I don't - %% think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - [safe_invoke(LocalPids, Fun)| - delegate_per_remote_node(NodePids, Fun, DelegateFun)]. - -delegate_per_remote_node(NodePids, Fun, DelegateFun) -> - Self = self(), - %% Note that this is unsafe if the Fun requires reentrancy to the - %% local_server. I.e. if self() == local_server(Node) then we'll - %% block forever. - [gen_server2:cast( - local_server(Node), - {thunk, fun () -> - Self ! {result, - DelegateFun( - Node, fun () -> safe_invoke(Pids, Fun) end)} - end}) || {Node, Pids} <- NodePids], - [receive {result, Result} -> Result end || _ <- NodePids]. - -local_server(Node) -> - case get({delegate_local_server_name, Node}) of - undefined -> - Name = server(outgoing, - erlang:phash2({self(), Node}, process_count())), - put({delegate_local_server_name, Node}, Name), - Name; - Name -> Name - end. - -remote_server(Node) -> - case get({delegate_remote_server_name, Node}) of - undefined -> - case rpc:call(Node, delegate, process_count, []) of - {badrpc, _} -> - %% Have to return something, if we're just casting - %% then we don't want to blow up - server(incoming, 1); - Count -> - Name = server(incoming, - erlang:phash2({self(), Node}, Count)), - put({delegate_remote_server_name, Node}, Name), - Name - end; - Name -> Name + lists:foldl( + fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> + {[Pid | Local], Remote}; + (Pid, {Local, Remote}) -> + {Local, + orddict:update( + node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} + end, {[], orddict:new()}, Pids). + +delegate_count() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + Count. + +delegate_name(Hash) -> + list_to_atom("delegate_" ++ integer_to_list(Hash)). + +delegate() -> + case get(delegate) of + undefined -> Name = delegate_name( + erlang:phash2(self(), delegate_count())), + put(delegate, Name), + Name; + Name -> Name end. -server(Prefix, Hash) -> - list_to_atom("delegate_" ++ - atom_to_list(Prefix) ++ "_" ++ - integer_to_list(Hash)). - safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; safe_invoke(Pid, Fun) when is_pid(Pid) -> try - {ok, Fun(Pid), Pid} - catch - Class:Reason -> - {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + {ok, Pid, Fun(Pid)} + catch Class:Reason -> + {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. -process_count() -> - ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). - -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, no_state, hibernate, + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -%% We don't need a catch here; we always go via safe_invoke. A catch here would -%% be the wrong thing anyway since the Thunk can throw multiple errors. -handle_call({thunk, Thunk}, _From, State) -> - {reply, Thunk(), State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_cast({thunk, Thunk}, State) -> - Thunk(), - {noreply, State, hibernate}. +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. |