diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-08 14:56:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-08 14:56:31 +0100 |
commit | 96b1c38adb4aa3a0b24922bf384d9140b19a215f (patch) | |
tree | 34b025ca5cbf5f829cacfc8b822c73e638c48116 | |
parent | 22c34ea12a1a5af08012b05e17873dc812de93ff (diff) | |
download | rabbitmq-server-96b1c38adb4aa3a0b24922bf384d9140b19a215f.tar.gz |
Support MFA in delegate:invoke/2 and friends.
-rw-r--r-- | src/delegate.erl | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 7a06c1e4..9a1f6886 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -33,15 +33,14 @@ -export_type([monitor_ref/0]). -type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa() :: fun ((pid()) -> any()) | {atom(), atom(), [any()]}). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa()) -> any(); + ([pid()], fun_or_mfa()) -> {[{pid(), any()}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa()) -> 'ok'). -spec(monitor/2 :: ('process', pid()) -> monitor_ref()). -spec(demonitor/1 :: (monitor_ref()) -> 'true'). -spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). @@ -64,24 +63,24 @@ start_link(Num) -> Name = delegate_name(Num), gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + fun_or_mfa(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -91,38 +90,38 @@ invoke(Pids, Fun) when is_list(Pids) -> [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}, infinity) + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; RemoteNodes -> gen_server2:abcast( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}) + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. monitor(Type, Pid) when node(Pid) =:= node() -> @@ -171,23 +170,29 @@ delegate(Pid, RemoteNodes) -> Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, fun_or_mfa(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +fun_or_mfa(Fun, Pid) when is_function(Fun) -> + Fun(Pid); +fun_or_mfa({M, F, A}, Pid) -> + apply(M, F, [Pid | A]). + %%---------------------------------------------------------------------------- init([Name]) -> {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> @@ -205,8 +210,8 @@ handle_cast({demonitor, Pid, Options}, State end, hibernate}; -handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), {noreply, State, hibernate}. handle_info({'DOWN', Ref, process, Pid, Info}, |