summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-03-17 13:47:44 +0000
committerSimon MacMullen <simon@lshift.net>2010-03-17 13:47:44 +0000
commit929f43f01c183a9e0fbc753952da4d9f640b08e7 (patch)
tree9ddf4ff2d6e8dfacbcfb285568b0f6a5a2724bb0
parent9fe32fe62e9acb0373ed369365a7a1880571987f (diff)
downloadrabbitmq-server-929f43f01c183a9e0fbc753952da4d9f640b08e7.tar.gz
Generic delegate mechanism, similar to what the router did before.
-rw-r--r--include/delegate.hrl32
-rw-r--r--src/delegate.erl129
-rw-r--r--src/delegate_sup.erl56
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_tests.erl90
5 files changed, 315 insertions, 0 deletions
diff --git a/include/delegate.hrl b/include/delegate.hrl
new file mode 100644
index 00000000..38f8d42f
--- /dev/null
+++ b/include/delegate.hrl
@@ -0,0 +1,32 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-define(DELEGATE_PROCESSES, 10).
diff --git a/src/delegate.erl b/src/delegate.erl
new file mode 100644
index 00000000..3a26c410
--- /dev/null
+++ b/src/delegate.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate).
+-include("delegate.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/1, delegate_async/2, delegate_sync/2, server/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+
+%%----------------------------------------------------------------------------
+
+start_link(Hash) ->
+ gen_server2:start_link({local, server(Hash)},
+ ?MODULE, [], []).
+delegate_sync(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({server(), Node}, {thunk, Thunk}, infinity);
+
+delegate_sync(Pid, FPid) when is_pid(Pid) ->
+ [[Res]] = delegate_per_node([{node(Pid), [Pid]}],
+ f_pid_node(fun delegate_sync/2, FPid)),
+ Res;
+
+delegate_sync(Pids, FPid) when is_list(Pids) ->
+ lists:flatten(
+ delegate_per_node(split_per_node(Pids),
+ f_pid_node(fun delegate_sync/2, FPid))).
+
+delegate_async(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({server(), Node}, {thunk, Thunk});
+
+delegate_async(Pid, FPid) when is_pid(Pid) ->
+ delegate_per_node([{node(Pid), [Pid]}],
+ f_pid_node(fun delegate_async/2, FPid));
+
+delegate_async(Pids, FPid) when is_list(Pids) ->
+ delegate_per_node(split_per_node(Pids),
+ f_pid_node(fun delegate_async/2, FPid)).
+
+%%----------------------------------------------------------------------------
+
+split_per_node(Pids) ->
+ dict:to_list(
+ lists:foldl(
+ fun (Pid, D) ->
+ dict:update(node(Pid),
+ fun (Pids1) -> [Pid | Pids1] end,
+ [Pid], D)
+ end,
+ dict:new(), Pids)).
+
+f_pid_node(DelegateFun, FPid) ->
+ fun(Pid, Node) ->
+ DelegateFun(Node, fun() -> FPid(Pid) end)
+ end.
+
+% TODO this only gets called when we are ONLY talking to the local node - can
+% we improve this?
+delegate_per_node([{Node, Pids}], FPidNode) when Node == node() ->
+ % optimisation
+ [[FPidNode(Pid, node()) || Pid <- Pids]];
+
+delegate_per_node(NodePids, FPidNode) ->
+ rabbit_misc:upmap(
+ fun ({Node, Pids}) ->
+ [FPidNode(Pid, Node) || Pid <- Pids]
+ end,
+ NodePids).
+
+server() ->
+ server(erlang:phash(self(), ?DELEGATE_PROCESSES)).
+
+server(Hash) ->
+ list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))).
+
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, no_state}.
+
+handle_call({thunk, Thunk}, _From, State) ->
+ {reply, catch Thunk(), State}.
+
+handle_cast({thunk, Thunk}, State) ->
+ catch Thunk(),
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
new file mode 100644
index 00000000..f7a1c586
--- /dev/null
+++ b/src/delegate_sup.erl
@@ -0,0 +1,56 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate_sup).
+-include("delegate.hrl").
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%--------------------------------------------------------------------
+
+init(_Args) ->
+ {ok, {{one_for_one, 10, 10},
+ [{delegate:server(Hash), {delegate, start_link, [Hash]},
+ temporary, 16#ffffffff, worker, [delegate]} ||
+ Hash <- lists:seq(1, ?DELEGATE_PROCESSES)]}}.
+
+%%--------------------------------------------------------------------
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 700acede..693731f9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -85,6 +85,14 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+
+-rabbit_boot_step({delegate_sup,
+ [{description, "cluster delegate"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [delegate_sup]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 82f2d199..29ec7999 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -62,6 +62,8 @@ all_tests() ->
passed = test_user_management(),
passed = test_server_status(),
passed = test_hooks(),
+ passed = test_delegates_async(),
+ passed = test_delegates_sync(),
passed.
test_priority_queue() ->
@@ -811,6 +813,94 @@ test_hooks() ->
end,
passed.
+test_delegates_async() ->
+ SecondaryNode = rabbit_misc:makenode("hare"),
+
+ Self = self(),
+ Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+
+ Receiver = fun() ->
+ receive
+ {invoked, Pid} ->
+ Pid ! response,
+ ok
+ after 100 ->
+ io:format("Async message not sent~n"),
+ throw(timeout)
+ end
+ end,
+
+ delegate:delegate_async(spawn(Receiver), Sender),
+ delegate:delegate_async(spawn(SecondaryNode, Receiver), Sender),
+ await_response(2),
+
+ LocalPids = [spawn(Receiver) || _ <- lists:seq(1,10)],
+ RemotePids = [spawn(SecondaryNode, Receiver) || _ <- lists:seq(1,10)],
+ delegate:delegate_async(LocalPids ++ RemotePids, Sender),
+ await_response(20),
+
+ passed.
+
+await_response(0) ->
+ ok;
+
+await_response(Count) ->
+ receive
+ response -> ok,
+ await_response(Count - 1)
+ after 100 ->
+ io:format("Async reply not received~n"),
+ throw(timeout)
+ end.
+
+test_delegates_sync() ->
+ SecondaryNode = rabbit_misc:makenode("hare"),
+ "foo" = delegate:delegate_sync(node(), fun() -> "foo" end),
+ "bar" = delegate:delegate_sync(SecondaryNode, fun() -> "bar" end),
+
+ Sender = fun(Pid) ->
+ gen_server2:call(Pid, invoked)
+ end,
+
+ Responder = fun() ->
+ receive
+ {'$gen_call', From, invoked} ->
+ gen_server2:reply(From, response)
+ after 100 ->
+ io:format("Sync hook not invoked~n"),
+ throw(timeout)
+ end
+ end,
+
+ BadResponder = fun() ->
+ receive
+ {'$gen_call', _From, invoked} ->
+ throw(exception)
+ after 100 ->
+ io:format("Crashing sync hook not invoked~n"),
+ throw(timeout)
+ end
+ end,
+
+ response = delegate:delegate_sync(spawn(Responder), Sender),
+ response = delegate:delegate_sync(spawn(SecondaryNode, Responder), Sender),
+
+ {'EXIT', _} = delegate:delegate_sync(spawn(BadResponder), Sender),
+ {'EXIT', _} = delegate:delegate_sync(spawn(SecondaryNode, BadResponder), Sender),
+
+ LocalGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)],
+ RemoteGoodPids = [spawn(Responder) || _ <- lists:seq(1,2)],
+ LocalBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)],
+ RemoteBadPids = [spawn(SecondaryNode, BadResponder) || _ <- lists:seq(1,2)],
+
+ [response, response, response, response] =
+ delegate:delegate_sync(LocalGoodPids ++ RemoteGoodPids, Sender),
+ [{'EXIT', _}, {'EXIT', _}, {'EXIT', _}, {'EXIT', _}] =
+ delegate:delegate_sync(LocalBadPids ++ RemoteBadPids, Sender),
+
+ passed.
+
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).