path: root/src
diff options
authorSimon MacMullen <>2011-02-10 10:51:26 +0000
committerSimon MacMullen <>2011-02-10 10:51:26 +0000
commit36e0842143bc26c87af6cee699977dd416194cb4 (patch)
tree66bc7effc428e16a436b6466368e50b7b589ce19 /src
parentfbc8e8a3be7671c02d73b7af0f9342fcf2757b5e (diff)
parenta1460c9cc3b1ccce1f784b37466180da3d0d4f02 (diff)
Merge bug23813 (abstract rabbit out of delegate)
Diffstat (limited to 'src')
8 files changed, 164 insertions, 48 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 81e0e54b..1beed5c1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -152,6 +152,11 @@
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay}]}).
+ [{description, "notify cluster nodes"},
+ {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {requires, networking}]}).
@@ -227,11 +232,11 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, SupPid} = rabbit_sup:start_link(),
+ true = register(rabbit, self()),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid};
Error ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a6da551d..1d423809 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
- case is_process_alive(QPid) of
+ case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end
@@ -356,7 +356,8 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) ->
+ delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe..496b2064 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -790,20 +790,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
handle_call(info, _From, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7d916797..abc27c5f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
@@ -350,8 +352,11 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
- catch exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
+ catch
+ exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
+ Handler();
+ exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
@@ -858,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 283d25c7..36f61628 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,16 +32,6 @@
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000).
@@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
{tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
@@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e4bc1cdc..817abaa2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,14 +22,41 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([notify_cluster/0, rabbit_running_on/1]).
-define(SERVER, ?MODULE).
+-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+-spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(notify_cluster/0 :: () -> 'ok').
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+rabbit_running_on(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+notify_cluster() ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this rabbit
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ %% register other active rabbits with this rabbit
+ [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ ok.
init([]) ->
@@ -39,19 +66,20 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({rabbit_running_on, Node}, State) ->
+ rabbit_log:info("node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({nodeup, Node}, State) ->
- rabbit_log:info("node ~p up", [Node]),
- {noreply, State};
handle_info({nodedown, Node}, State) ->
- rabbit_log:info("node ~p down", [Node]),
- %% TODO: This may turn out to be a performance hog when there are
- %% lots of nodes. We really only need to execute this code on
- %% *one* node, rather than all of them.
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
+ rabbit_log:info("node ~p down~n", [Node]),
+ ok = handle_dead_rabbit(Node),
+ {noreply, State};
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+ rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+ ok = handle_dead_rabbit(Node),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) ->
+%% TODO: This may turn out to be a performance hog when there are
+%% lots of nodes. We really only need to execute this code on
+%% *one* node, rather than all of them.
+handle_dead_rabbit(Node) ->
+ ok = rabbit_networking:on_node_down(Node),
+ ok = rabbit_amqqueue:on_node_down(Node).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b5d82ac2..1781469a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -357,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({handshake_timeout, State#v1.callback})
timeout ->
- throw({timeout, State#v1.connection_state});
+ case State#v1.connection_state of
+ closed -> mainloop(Deb, State);
+ S -> throw({timeout, S})
+ end;
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -922,10 +925,14 @@ socket_info(Get, Select) ->
ssl_info(F, Sock) ->
+ %% The first ok form is R14
+ %% The second is R13 - the extra term is exportability (by inspection,
+ %% the docs are wrong)
case rabbit_net:ssl_info(Sock) of
- nossl -> '';
- {error, _} -> '';
- {ok, Info} -> F(Info)
+ nossl -> '';
+ {error, _} -> '';
+ {ok, {P, {K, C, H}}} -> F({P, {K, C, H}});
+ {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
cert_info(F, Sock) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b09508..58c369b5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -26,6 +26,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
Node = node(),
Self = self(),
Remote = spawn(SecondaryNode,
- fun () -> A = test_delegates_async(Node),
- B = test_delegates_sync(Node),
- Self ! {self(), {A, B}}
+ fun () -> Rs = [ test_delegates_async(Node),
+ test_delegates_sync(Node),
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
+ Self ! {self(), Rs}
{Remote, Result} ->
- Result = {passed, passed}
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
@@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) ->
+test_queue_cleanup_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_queue_cleanup_receiver(Pid)
+ end.
+test_queue_cleanup(_SecondaryNode) ->
+ {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
+ receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
+ ok
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ rabbit:stop(),
+ rabbit:start(),
+ rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
+ queue = ?CLEANUP_QUEUE_NAME }),
+ receive
+ {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ ok
+ after 2000 ->
+ throw(failed_to_receive_channel_exit)
+ end,
+ passed.
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(,
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
control_action(Command, Args) ->