diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-08 13:36:42 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-08 13:36:42 +0000 |
commit | 79b65907112277b1574c022ec9772eb55b63b288 (patch) | |
tree | 5bd137e34dedf2ff5c8b3c22615a3c63a2580606 | |
parent | 5aecb87648ff3ad0f3f7714e3c8d0368e7a163cd (diff) | |
parent | 73d09509763dc07a6fb1f0aaa4ff40c048133637 (diff) | |
download | rabbitmq-server-79b65907112277b1574c022ec9772eb55b63b288.tar.gz |
Merging bug22698 to default
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 53 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 40 |
4 files changed, 88 insertions, 15 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c6661d39..67e2e40f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -153,6 +153,11 @@ [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({notify_cluster, + [{description, "notify cluster nodes"}, + {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {requires, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). @@ -225,11 +230,11 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, SupPid} = rabbit_sup:start_link(), + true = register(rabbit, self()), print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), - {ok, SupPid}; Error -> Error diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a6da551d..2545b07c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -356,7 +356,8 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> + delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e4bc1cdc..817abaa2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,14 +22,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). +-define(RABBIT_UP_RPC_TIMEOUT, 2000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(notify_cluster/0 :: () -> 'ok'). + +-endif. %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + +notify_cluster() -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this rabbit + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + %% register other active rabbits with this rabbit + [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -39,19 +66,20 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({rabbit_running_on, Node}, State) -> + rabbit_log:info("node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodeup, Node}, State) -> - rabbit_log:info("node ~p up", [Node]), - {noreply, State}; handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down", [Node]), - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute this code on - %% *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), + rabbit_log:info("node ~p down~n", [Node]), + ok = handle_dead_rabbit(Node), + {noreply, State}; +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> + rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), + ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- +%% TODO: This may turn out to be a performance hog when there are +%% lots of nodes. We really only need to execute this code on +%% *one* node, rather than all of them. +handle_dead_rabbit(Node) -> + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node). + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b09508..ddb53b15 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -26,6 +26,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -80,19 +81,21 @@ run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + passed = test_queue_cleanup(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate Node = node(), Self = self(), Remote = spawn(SecondaryNode, - fun () -> A = test_delegates_async(Node), - B = test_delegates_sync(Node), - Self ! {self(), {A, B}} + fun () -> Rs = [ test_delegates_async(Node), + test_delegates_sync(Node), + test_queue_cleanup(Node) ], + Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = {passed, passed} + Result = [passed, passed, passed] after 2000 -> throw(timeout) end, @@ -1278,6 +1281,35 @@ test_delegates_sync(SecondaryNode) -> passed. +test_queue_cleanup_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_queue_cleanup_receiver(Pid) + end. + + +test_queue_cleanup(_SecondaryNode) -> + {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), + receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> + ok + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + rabbit:stop(), + rabbit:start(), + rabbit_channel:do(Ch, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), + receive + {channel_exit, 1, {amqp_error, not_found, _, _}} -> + ok + after 2000 -> + throw(failed_to_receive_channel_exit) + end, + passed. + %--------------------------------------------------------------------- control_action(Command, Args) -> |