summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-03-17 13:47:44 +0000
committerSimon MacMullen <simon@lshift.net>2010-03-17 13:47:44 +0000
commit929f43f01c183a9e0fbc753952da4d9f640b08e7 (patch)
tree9ddf4ff2d6e8dfacbcfb285568b0f6a5a2724bb0 /src/delegate.erl
parent9fe32fe62e9acb0373ed369365a7a1880571987f (diff)
downloadrabbitmq-server-929f43f01c183a9e0fbc753952da4d9f640b08e7.tar.gz
Generic delegate mechanism, similar to what the router did before.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl129
1 files changed, 129 insertions, 0 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
new file mode 100644
index 00000000..3a26c410
--- /dev/null
+++ b/src/delegate.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate).
+-include("delegate.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/1, delegate_async/2, delegate_sync/2, server/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+
+%%----------------------------------------------------------------------------
+
+start_link(Hash) ->
+ gen_server2:start_link({local, server(Hash)},
+ ?MODULE, [], []).
+delegate_sync(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({server(), Node}, {thunk, Thunk}, infinity);
+
+delegate_sync(Pid, FPid) when is_pid(Pid) ->
+ [[Res]] = delegate_per_node([{node(Pid), [Pid]}],
+ f_pid_node(fun delegate_sync/2, FPid)),
+ Res;
+
+delegate_sync(Pids, FPid) when is_list(Pids) ->
+ lists:flatten(
+ delegate_per_node(split_per_node(Pids),
+ f_pid_node(fun delegate_sync/2, FPid))).
+
+delegate_async(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({server(), Node}, {thunk, Thunk});
+
+delegate_async(Pid, FPid) when is_pid(Pid) ->
+ delegate_per_node([{node(Pid), [Pid]}],
+ f_pid_node(fun delegate_async/2, FPid));
+
+delegate_async(Pids, FPid) when is_list(Pids) ->
+ delegate_per_node(split_per_node(Pids),
+ f_pid_node(fun delegate_async/2, FPid)).
+
+%%----------------------------------------------------------------------------
+
+split_per_node(Pids) ->
+ dict:to_list(
+ lists:foldl(
+ fun (Pid, D) ->
+ dict:update(node(Pid),
+ fun (Pids1) -> [Pid | Pids1] end,
+ [Pid], D)
+ end,
+ dict:new(), Pids)).
+
+f_pid_node(DelegateFun, FPid) ->
+ fun(Pid, Node) ->
+ DelegateFun(Node, fun() -> FPid(Pid) end)
+ end.
+
+% TODO this only gets called when we are ONLY talking to the local node - can
+% we improve this?
+delegate_per_node([{Node, Pids}], FPidNode) when Node == node() ->
+ % optimisation
+ [[FPidNode(Pid, node()) || Pid <- Pids]];
+
+delegate_per_node(NodePids, FPidNode) ->
+ rabbit_misc:upmap(
+ fun ({Node, Pids}) ->
+ [FPidNode(Pid, Node) || Pid <- Pids]
+ end,
+ NodePids).
+
+server() ->
+ server(erlang:phash(self(), ?DELEGATE_PROCESSES)).
+
+server(Hash) ->
+ list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))).
+
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, no_state}.
+
+handle_call({thunk, Thunk}, _From, State) ->
+ {reply, catch Thunk(), State}.
+
+handle_cast({thunk, Thunk}, State) ->
+ catch Thunk(),
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------