diff options
author | Simon MacMullen <simon@lshift.net> | 2010-03-17 13:47:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-03-17 13:47:44 +0000 |
commit | 929f43f01c183a9e0fbc753952da4d9f640b08e7 (patch) | |
tree | 9ddf4ff2d6e8dfacbcfb285568b0f6a5a2724bb0 /src | |
parent | 9fe32fe62e9acb0373ed369365a7a1880571987f (diff) | |
download | rabbitmq-server-929f43f01c183a9e0fbc753952da4d9f640b08e7.tar.gz |
Generic delegate mechanism, similar to what the router did before.
Diffstat (limited to 'src')
-rw-r--r-- | src/delegate.erl | 129 | ||||
-rw-r--r-- | src/delegate_sup.erl | 56 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 90 |
4 files changed, 283 insertions, 0 deletions
diff --git a/src/delegate.erl b/src/delegate.erl new file mode 100644 index 00000000..3a26c410 --- /dev/null +++ b/src/delegate.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate). +-include("delegate.hrl"). + +-behaviour(gen_server2). + +-export([start_link/1, delegate_async/2, delegate_sync/2, server/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + + +%%---------------------------------------------------------------------------- + +start_link(Hash) -> + gen_server2:start_link({local, server(Hash)}, + ?MODULE, [], []). +delegate_sync(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); + +delegate_sync(Pid, FPid) when is_pid(Pid) -> + [[Res]] = delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_sync/2, FPid)), + Res; + +delegate_sync(Pids, FPid) when is_list(Pids) -> + lists:flatten( + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_sync/2, FPid))). + +delegate_async(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}); + +delegate_async(Pid, FPid) when is_pid(Pid) -> + delegate_per_node([{node(Pid), [Pid]}], + f_pid_node(fun delegate_async/2, FPid)); + +delegate_async(Pids, FPid) when is_list(Pids) -> + delegate_per_node(split_per_node(Pids), + f_pid_node(fun delegate_async/2, FPid)). + +%%---------------------------------------------------------------------------- + +split_per_node(Pids) -> + dict:to_list( + lists:foldl( + fun (Pid, D) -> + dict:update(node(Pid), + fun (Pids1) -> [Pid | Pids1] end, + [Pid], D) + end, + dict:new(), Pids)). + +f_pid_node(DelegateFun, FPid) -> + fun(Pid, Node) -> + DelegateFun(Node, fun() -> FPid(Pid) end) + end. + +% TODO this only gets called when we are ONLY talking to the local node - can +% we improve this? +delegate_per_node([{Node, Pids}], FPidNode) when Node == node() -> + % optimisation + [[FPidNode(Pid, node()) || Pid <- Pids]]; + +delegate_per_node(NodePids, FPidNode) -> + rabbit_misc:upmap( + fun ({Node, Pids}) -> + [FPidNode(Pid, Node) || Pid <- Pids] + end, + NodePids). + +server() -> + server(erlang:phash(self(), ?DELEGATE_PROCESSES)). + +server(Hash) -> + list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state}. + +handle_call({thunk, Thunk}, _From, State) -> + {reply, catch Thunk(), State}. + +handle_cast({thunk, Thunk}, State) -> + catch Thunk(), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl new file mode 100644 index 00000000..f7a1c586 --- /dev/null +++ b/src/delegate_sup.erl @@ -0,0 +1,56 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate_sup). +-include("delegate.hrl"). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%-------------------------------------------------------------------- + +init(_Args) -> + {ok, {{one_for_one, 10, 10}, + [{delegate:server(Hash), {delegate, start_link, [Hash]}, + temporary, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(1, ?DELEGATE_PROCESSES)]}}. + +%%-------------------------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede..693731f9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -85,6 +85,14 @@ {requires, kernel_ready}, {enables, core_initialized}]}). + +-rabbit_boot_step({delegate_sup, + [{description, "cluster delegate"}, + {mfa, {rabbit_sup, start_restartable_child, + [delegate_sup]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 82f2d199..29ec7999 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -62,6 +62,8 @@ all_tests() -> passed = test_user_management(), passed = test_server_status(), passed = test_hooks(), + passed = test_delegates_async(), + passed = test_delegates_sync(), passed. test_priority_queue() -> @@ -811,6 +813,94 @@ test_hooks() -> end, passed. +test_delegates_async() -> + SecondaryNode = rabbit_misc:makenode("hare"), + + Self = self(), + Sender = fun(Pid) -> Pid ! {invoked, Self} end, + + Receiver = fun() -> + receive + {invoked, Pid} -> + Pid ! response, + ok + after 100 -> + io:format("Async message not sent~n"), + throw(timeout) + end + end, + + delegate:delegate_async(spawn(Receiver), Sender), + delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender), + await_response(2), + + LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)], + RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)], + delegate:delegate_async(LocalPids ++ RemotePids, Sender), + await_response(20), + + passed. + +await_response(0) -> + ok; + +await_response(Count) -> + receive + response -> ok, + await_response(Count - 1) + after 100 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + +test_delegates_sync() -> + SecondaryNode = rabbit_misc:makenode("hare"), + "foo" = delegate:delegate_sync(node(), fun() -> "foo" end), + "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end), + + Sender = fun(Pid) -> + gen_server2:call(Pid, invoked) + end, + + Responder = fun() -> + receive + {'$gen_call', From, invoked} -> + gen_server2:reply(From, response) + after 100 -> + io:format("Sync hook not invoked~n"), + throw(timeout) + end + end, + + BadResponder = fun() -> + receive + {'$gen_call', _From, invoked} -> + throw(exception) + after 100 -> + io:format("Crashing sync hook not invoked~n"), + throw(timeout) + end + end, + + response = delegate:delegate_sync(spawn(Responder), Sender), + response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender), + + {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender), + {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender), + + LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], + RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)], + LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], + RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)], + + [response, response, response, response] = + delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender), + [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] = + delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender), + + passed. + + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). |