summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl209
1 files changed, 83 insertions, 126 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 11abe73b..10054e57 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -31,11 +31,9 @@
-module(delegate).
--define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-
-behaviour(gen_server2).
--export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,13 +42,16 @@
-ifdef(use_specs).
--spec(start_link/2 ::
- (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
+-spec(invoke/2 ::
+ ( pid(), fun ((pid()) -> A)) -> A;
+ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
--spec(process_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/0 :: () -> non_neg_integer()).
-endif.
@@ -61,157 +62,113 @@
%%----------------------------------------------------------------------------
-start_link(Prefix, Hash) ->
- gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
+start_link(Num) ->
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ Fun(Pid);
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
- case Res of
- {ok, Result, _} ->
+ case invoke([Pid], Fun) of
+ {[{Pid, Result}], []} ->
Result;
- {error, {Class, Reason, StackTrace}, _} ->
+ {[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
invoke(Pids, Fun) when is_list(Pids) ->
- lists:foldl(
- fun ({Status, Result, Pid}, {Good, Bad}) ->
- case Status of
- ok -> {[{Pid, Result}|Good], Bad};
- error -> {Good, [{Pid, Result}|Bad]}
- end
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ %% The use of multi_call is only safe because the timeout is
+ %% infinity, and thus there is no process spawned in order to do
+ %% the sending. Thus calls can't overtake preceding calls/casts.
+ {Replies, BadNodes} =
+ case orddict:fetch_keys(Grouped) of
+ [] -> {[], []};
+ RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped},
+ infinity)
end,
- {[], []},
- invoke_per_node(split_delegate_per_node(Pids), Fun)).
+ BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
+ BadNode <- BadNodes,
+ Pid <- orddict:fetch(BadNode, Grouped)],
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ [Results || {_Node, Results} <- Replies]]),
+ lists:foldl(
+ fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
+ ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
+ end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, Fun), %% we don't care about any error
ok;
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result([Pid], Fun);
invoke_no_result(Pids, Fun) when is_list(Pids) ->
- invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ case orddict:fetch_keys(Grouped) of
+ [] -> ok;
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped})
+ end,
+ safe_invoke(LocalPids, Fun), %% must not die
ok.
%%----------------------------------------------------------------------------
-internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
-
-internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
-
-split_delegate_per_node(Pids) ->
+group_pids_by_node(Pids) ->
LocalNode = node(),
- {Local, Remote} =
- lists:foldl(
- fun (Pid, {L, D}) ->
- Node = node(Pid),
- case Node of
- LocalNode -> {[Pid|L], D};
- _ -> {L, orddict:append(Node, Pid, D)}
- end
- end,
- {[], orddict:new()}, Pids),
- {Local, orddict:to_list(Remote)}.
-
-invoke_per_node(NodePids, Fun) ->
- lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-
-invoke_no_result_per_node(NodePids, Fun) ->
- delegate_per_node(NodePids, Fun, fun internal_cast/2),
- ok.
-
-delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
- %% In the case where DelegateFun is internal_cast, the safe_invoke
- %% is not actually async! However, in practice Fun will always be
- %% something that does a gen_server:cast or similar, so I don't
- %% think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- [safe_invoke(LocalPids, Fun)|
- delegate_per_remote_node(NodePids, Fun, DelegateFun)].
-
-delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
- Self = self(),
- %% Note that this is unsafe if the Fun requires reentrancy to the
- %% local_server. I.e. if self() == local_server(Node) then we'll
- %% block forever.
- [gen_server2:cast(
- local_server(Node),
- {thunk, fun () ->
- Self ! {result,
- DelegateFun(
- Node, fun () -> safe_invoke(Pids, Fun) end)}
- end}) || {Node, Pids} <- NodePids],
- [receive {result, Result} -> Result end || _ <- NodePids].
-
-local_server(Node) ->
- case get({delegate_local_server_name, Node}) of
- undefined ->
- Name = server(outgoing,
- erlang:phash2({self(), Node}, process_count())),
- put({delegate_local_server_name, Node}, Name),
- Name;
- Name -> Name
- end.
-
-remote_server(Node) ->
- case get({delegate_remote_server_name, Node}) of
- undefined ->
- case rpc:call(Node, delegate, process_count, []) of
- {badrpc, _} ->
- %% Have to return something, if we're just casting
- %% then we don't want to blow up
- server(incoming, 1);
- Count ->
- Name = server(incoming,
- erlang:phash2({self(), Node}, Count)),
- put({delegate_remote_server_name, Node}, Name),
- Name
- end;
- Name -> Name
+ lists:foldl(
+ fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
+ {[Pid | Local], Remote};
+ (Pid, {Local, Remote}) ->
+ {Local,
+ orddict:update(
+ node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
+ end, {[], orddict:new()}, Pids).
+
+delegate_count() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ Count.
+
+delegate_name(Hash) ->
+ list_to_atom("delegate_" ++ integer_to_list(Hash)).
+
+delegate() ->
+ case get(delegate) of
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(), delegate_count())),
+ put(delegate, Name),
+ Name;
+ Name -> Name
end.
-server(Prefix, Hash) ->
- list_to_atom("delegate_" ++
- atom_to_list(Prefix) ++ "_" ++
- integer_to_list(Hash)).
-
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
safe_invoke(Pid, Fun) when is_pid(Pid) ->
try
- {ok, Fun(Pid), Pid}
- catch
- Class:Reason ->
- {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ {ok, Pid, Fun(Pid)}
+ catch Class:Reason ->
+ {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
-process_count() ->
- ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
-
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, no_state, hibernate,
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-%% We don't need a catch here; we always go via safe_invoke. A catch here would
-%% be the wrong thing anyway since the Thunk can throw multiple errors.
-handle_call({thunk, Thunk}, _From, State) ->
- {reply, Thunk(), State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({thunk, Thunk}, State) ->
- Thunk(),
- {noreply, State, hibernate}.
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.