diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-10 10:51:26 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-10 10:51:26 +0000 |
commit | 36e0842143bc26c87af6cee699977dd416194cb4 (patch) | |
tree | 66bc7effc428e16a436b6466368e50b7b589ce19 /src | |
parent | fbc8e8a3be7671c02d73b7af0f9342fcf2757b5e (diff) | |
parent | a1460c9cc3b1ccce1f784b37466180da3d0d4f02 (diff) | |
download | rabbitmq-server-36e0842143bc26c87af6cee699977dd416194cb4.tar.gz |
Merge bug23813 (abstract rabbit out of delegate)
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 18 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 16 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 53 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 15 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 70 |
8 files changed, 164 insertions, 48 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 81e0e54b..1beed5c1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -152,6 +152,11 @@ [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({notify_cluster, + [{description, "notify cluster nodes"}, + {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {requires, networking}]}). + %%--------------------------------------------------------------------------- -include("rabbit_framing.hrl"). @@ -227,11 +232,11 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, SupPid} = rabbit_sup:start_link(), + true = register(rabbit, self()), print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), - {ok, SupPid}; Error -> Error diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a6da551d..1d423809 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> - case is_process_alive(QPid) of + case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), fun (Tx) -> TailFun(Tx), ExistingQ end @@ -356,7 +356,8 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> + delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c7e28fe..496b2064 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -790,20 +790,20 @@ handle_call({init, Recover}, From, handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined} = State, - gen_server2:reply(From, not_found), - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) - end, - BQS = BQ:init(QName, IsDurable, Recover), - %% Rely on terminate to delete the queue. - {stop, normal, State#q{backing_queue_state = BQS}} + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + false -> #q{backing_queue = BQ, backing_queue_state = undefined, + q = #amqqueue{name = QName, durable = IsDurable}} = State, + gen_server2:reply(From, not_found), + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7d916797..abc27c5f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -194,6 +195,7 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -endif. @@ -350,8 +352,11 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end. @@ -858,3 +863,12 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 283d25c7..36f61628 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,16 +32,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - {backlog, 128}, % use the maximum listen(2) backlog value - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false} - ]). - -define(SSL_TIMEOUT, 5). %% seconds -define(FIRST_TEST_BIND_PORT, 10000). @@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], + [IPAddress, Port, [Family | tcp_opts()], {?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, @@ -315,6 +305,10 @@ hostname() -> cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). +tcp_opts() -> + {ok, Opts} = application:get_env(rabbit, tcp_listen_options), + Opts. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e4bc1cdc..817abaa2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,14 +22,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). +-define(RABBIT_UP_RPC_TIMEOUT, 2000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(notify_cluster/0 :: () -> 'ok'). + +-endif. %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + +notify_cluster() -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this rabbit + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + %% register other active rabbits with this rabbit + [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -39,19 +66,20 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({rabbit_running_on, Node}, State) -> + rabbit_log:info("node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodeup, Node}, State) -> - rabbit_log:info("node ~p up", [Node]), - {noreply, State}; handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down", [Node]), - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute this code on - %% *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), + rabbit_log:info("node ~p down~n", [Node]), + ok = handle_dead_rabbit(Node), + {noreply, State}; +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> + rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), + ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- +%% TODO: This may turn out to be a performance hog when there are +%% lots of nodes. We really only need to execute this code on +%% *one* node, rather than all of them. +handle_dead_rabbit(Node) -> + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node). + diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5d82ac2..1781469a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -357,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -922,10 +925,14 @@ socket_info(Get, Select) -> end. ssl_info(F, Sock) -> + %% The first ok form is R14 + %% The second is R13 - the extra term is exportability (by inspection, + %% the docs are wrong) case rabbit_net:ssl_info(Sock) of - nossl -> ''; - {error, _} -> ''; - {ok, Info} -> F(Info) + nossl -> ''; + {error, _} -> ''; + {ok, {P, {K, C, H}}} -> F({P, {K, C, H}}); + {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. cert_info(F, Sock) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b09508..58c369b5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -26,6 +26,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + passed = test_queue_cleanup(SecondaryNode), + passed = test_declare_on_dead_queue(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate Node = node(), Self = self(), Remote = spawn(SecondaryNode, - fun () -> A = test_delegates_async(Node), - B = test_delegates_sync(Node), - Self ! {self(), {A, B}} + fun () -> Rs = [ test_delegates_async(Node), + test_delegates_sync(Node), + test_queue_cleanup(Node), + test_declare_on_dead_queue(Node) ], + Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = {passed, passed} - after 2000 -> + Result = lists:duplicate(length(Result), passed) + after 30000 -> throw(timeout) end, @@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) -> passed. +test_queue_cleanup_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_queue_cleanup_receiver(Pid) + end. + + +test_queue_cleanup(_SecondaryNode) -> + {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), + receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> + ok + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + rabbit:stop(), + rabbit:start(), + rabbit_channel:do(Ch, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), + receive + {channel_exit, 1, {amqp_error, not_found, _, _}} -> + ok + after 2000 -> + throw(failed_to_receive_channel_exit) + end, + passed. + +test_declare_on_dead_queue(SecondaryNode) -> + QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), + Self = self(), + Pid = spawn(SecondaryNode, + fun () -> + {new, #amqqueue{name = QueueName, pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], + none), + exit(QPid, kill), + Self ! {self(), killed, QPid} + end), + receive + {Pid, killed, QPid} -> + {existing, #amqqueue{name = QueueName, + pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], none), + false = rabbit_misc:is_process_alive(QPid), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), + true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {ok, 0} = rabbit_amqqueue:delete(Q, false, false), + passed + after 2000 -> + throw(failed_to_create_and_kill_queue) + end. + %--------------------------------------------------------------------- control_action(Command, Args) -> |