diff options
author | Simon MacMullen <simon@lshift.net> | 2010-03-17 13:47:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-03-17 13:47:44 +0000 |
commit | 929f43f01c183a9e0fbc753952da4d9f640b08e7 (patch) | |
tree | 9ddf4ff2d6e8dfacbcfb285568b0f6a5a2724bb0 /src/delegate.erl | |
parent | 9fe32fe62e9acb0373ed369365a7a1880571987f (diff) | |
download | rabbitmq-server-929f43f01c183a9e0fbc753952da4d9f640b08e7.tar.gz |
Generic delegate mechanism, similar to what the router did before.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/src/delegate.erl b/src/delegate.erl new file mode 100644 index 00000000..3a26c410 --- /dev/null +++ b/src/delegate.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate). +-include("delegate.hrl"). + +-behaviour(gen_server2). + +-export([start_link/1, delegate_async/2, delegate_sync/2, server/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + + +%%---------------------------------------------------------------------------- + +start_link(Hash) -> + gen_server2:start_link({local, server(Hash)}, + ?MODULE, [], []). +delegate_sync(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); + +delegate_sync(Pid, FPid) when is_pid(Pid) -> + [[Res]] = delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_sync/2, FPid)), + Res; + +delegate_sync(Pids, FPid) when is_list(Pids) -> + lists:flatten( + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_sync/2, FPid))). + +delegate_async(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}); + +delegate_async(Pid, FPid) when is_pid(Pid) -> + delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_async/2, FPid)); + +delegate_async(Pids, FPid) when is_list(Pids) -> + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_async/2, FPid)). + +%%---------------------------------------------------------------------------- + +split_per_node(Pids) -> + dict:to_list( + lists:foldl( + fun (Pid, D) -> + dict:update(node(Pid), + fun (Pids1) -> [Pid | Pids1] end, + [Pid], D) + end, + dict:new(), Pids)). + +f_pid_node(DelegateFun, FPid) -> + fun(Pid, Node) -> + DelegateFun(Node, fun() -> FPid(Pid) end) + end. + +% TODO this only gets called when we are ONLY talking to the local node - can +% we improve this? +delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> + % optimisation + [[FPidNode(Pid, node()) || Pid <- Pids]]; + +delegate_per_node(NodePids, FPidNode) -> + rabbit_misc:upmap( + fun ({Node, Pids}) -> + [FPidNode(Pid, Node) || Pid <- Pids] + end, + NodePids). + +server() -> + server(erlang:phash(self(), ?DELEGATE_PROCESSES)). + +server(Hash) -> + list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state}. + +handle_call({thunk, Thunk}, _From, State) -> + {reply, catch Thunk(), State}. + +handle_cast({thunk, Thunk}, State) -> + catch Thunk(), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- |