summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-07 16:09:06 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-07 16:09:06 +0000
commit4cd34e45f12e31ef0090b0033c55be419cf47d69 (patch)
tree8619b6828dc531f0cc9bd5b559a04a97a66b5c43
parent709ed272ed055c1a6473f577cfffe772d040c55a (diff)
downloadrabbitmq-server-4cd34e45f12e31ef0090b0033c55be419cf47d69.tar.gz
Abstract out the rabbit from delegate (also improve robustness)
-rw-r--r--src/delegate.erl18
-rw-r--r--src/delegate_sup.erl32
-rw-r--r--src/rabbit.erl3
3 files changed, 30 insertions, 23 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 46bd8245..17046201 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -111,22 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count([RemoteNode | _]) ->
- {ok, Count} = case application:get_env(rabbit, delegate_count) of
- undefined -> rpc:call(RemoteNode, application, get_env,
- [rabbit, delegate_count]);
- Result -> Result
- end,
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name =
- delegate_name(erlang:phash2(
- self(), delegate_count(RemoteNodes))),
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index e0ffa7c8..96515ff4 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,8 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
+-export([boot/0]).
-export([init/1]).
@@ -28,20 +29,37 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
+
+-spec(boot/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ count(Nodes)
+ end.
+
+boot() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count([node()]),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6661d39..101c97f7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {delegate_sup, boot, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).