diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-17 01:16:12 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-17 01:16:12 +0000 |
commit | aea58d91b22163ee8cee76ce8d4b62224ebfa0e7 (patch) | |
tree | 7552d12081d8a50492b1674690091ec675a7e328 | |
parent | 2e3f4c1a4e2ae72b364f9ca37ee0976ad5809d9c (diff) | |
parent | 5ce6507e41f83da26f51e6139ef2de4020516d51 (diff) | |
download | rabbitmq-server-aea58d91b22163ee8cee76ce8d4b62224ebfa0e7.tar.gz |
merge bug 24265 into default (Somehow support DNs in rabbitmq_auth_mechanism_ssl)
-rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server.bat | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-service.bat | 2 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 11 | ||||
-rw-r--r-- | src/rabbit.erl | 33 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 66 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | src/rabbit_control.erl | 14 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 60 | ||||
-rw-r--r-- | src/rabbit_router.erl | 60 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
-rw-r--r-- | src/supervisor2.erl | 4 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 7 |
19 files changed, 148 insertions, 156 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 15755038..7268f090 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1273,9 +1273,10 @@ <para> Displays broker status information such as the running applications on the current Erlang node, RabbitMQ and - Erlang versions, OS name and memory statistics. (See - the <command>cluster_status</command> command to find - out which nodes are clustered and running.) + Erlang versions, OS name, memory and file descriptor + statistics. (See the <command>cluster_status</command> + command to find out which nodes are clustered and + running.) </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl status</screen> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 5ead1051..9301af6b 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -42,5 +42,6 @@ {reuseaddr, true}, {backlog, 128}, {nodelay, true}, + {linger, {true, 0}}, {exit_on_close, false}]} ]}]}. diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 1831f876..39a68c8e 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -117,7 +117,7 @@ exec erl \ -sasl sasl_error_logger false \ -rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ - -os_mon start_cpu_sup true \ + -os_mon start_cpu_sup false \ -os_mon start_disksup false \ -os_mon start_memsup false \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index c27b418a..44ce1ce1 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -139,7 +139,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 0be1129a..1582bfb1 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -204,7 +204,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6c3f1b5f..c11fb54b 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -125,8 +125,7 @@ %% requesting process is considered to 'own' one more %% descriptor. release/0 is the inverse operation and releases a %% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain is -%% used to obtain permission to accept file descriptors. Obtain has a +%% file descriptor between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -262,7 +261,7 @@ -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [obtain_count, obtain_limit]). +-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]). %%---------------------------------------------------------------------------- %% Public API @@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; -i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(total_limit, #fhc_state{limit = Limit}) -> Limit; +i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2; +i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(sockets_used, #fhc_state{obtain_count = Count}) -> Count; i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a2681a2..607033cb 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -308,17 +308,28 @@ stop_and_halt() -> ok. status() -> - [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications(infinity)}, - {os, os:type()}, - {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}] ++ - rabbit_misc:filter_exit_map( - fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, - [{vm_memory_high_watermark, {vm_memory_monitor, - get_vm_memory_high_watermark, []}}, - {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]). + S1 = [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications(infinity)}, + {os, os:type()}, + {erlang_version, erlang:system_info(system_version)}, + {memory, erlang:memory()}], + S2 = rabbit_misc:filter_exit_map( + fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, + [{vm_memory_high_watermark, {vm_memory_monitor, + get_vm_memory_high_watermark, []}}, + {vm_memory_limit, {vm_memory_monitor, + get_memory_limit, []}}]), + S3 = rabbit_misc:with_exit_handler( + fun () -> [] end, + fun () -> [{file_descriptors, file_handle_cache:info()}] end), + S4 = [{processes, [{limit, erlang:system_info(process_limit)}, + {used, erlang:system_info(process_count)}]}, + {run_queue, erlang:statistics(run_queue)}, + {uptime, begin + {T,_} = erlang:statistics(wall_clock), + T div 1000 + end}], + S1 ++ S2 ++ S3 ++ S4. is_running() -> is_running(node()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 96017df8..41e644f2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,17 +44,17 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, routing_result/0]). -type(name() :: rabbit_types:r('queue')). - +-type(qpids() :: [pid()]). -type(qlen() :: rabbit_types:ok(non_neg_integer())). -type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -69,7 +69,8 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | - rabbit_types:error('not_found')). + rabbit_types:error('not_found'); + ([name()]) -> [rabbit_types:amqqueue()]). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). @@ -117,12 +118,13 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). +-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> +-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). @@ -134,7 +136,7 @@ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). +-spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -419,14 +425,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver(QPid, Delivery = #delivery{immediate = true}) -> - gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); -deliver(QPid, Delivery = #delivery{mandatory = true}) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity), - true; -deliver(QPid, Delivery) -> - gen_server2:cast(QPid, {deliver, Delivery}), - true. +deliver([], #delivery{mandatory = false, immediate = false}) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + delegate:invoke_no_result( + QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -518,6 +549,9 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +qpids(Qs) -> lists:append([[QPid | SPids] || + #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). + safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba20b355..161f9787 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -877,9 +877,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, _From, State) -> - %% Synchronous, "immediate" delivery mode - %% +handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% FIXME: Is this correct semantics? %% %% I'm worried in particular about the case where an exchange has @@ -897,8 +895,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> false -> discard_delivery(Delivery, State1) end); -handle_call({deliver, Delivery}, From, State) -> - %% Synchronous, "mandatory" delivery mode. Reply asap. +handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b266d366..b116821c 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -29,7 +29,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_router:routing_result(), [pid()]} + ({ok, rabbit_amqqueue:routing_result(), [pid()]} | rabbit_types:error('not_found'))). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -88,8 +88,8 @@ publish(Delivery = #delivery{ end. publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = - rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. delivery(Mandatory, Immediate, Message, MsgSeqNo) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9b2fe28c..f14b2973 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1362,7 +1362,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, QNames}, State) -> - {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), + {RoutingRes, DeliveredQPids} = + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), State1 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 20486af5..22b57b1a 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -79,6 +79,12 @@ start() -> io:format(Format ++ " ...~n", Args1) end end, + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, + %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values case catch action(Command, Node, Args, Opts, Inform) of @@ -88,9 +94,11 @@ start() -> false -> io:format("...done.~n") end, rabbit_misc:quit(0); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15 + PrintInvalidCommandError(), usage(); {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d68063db..8d69a108 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -148,9 +148,8 @@ init([#amqqueue { name = QueueName } = Q]) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "immediate" delivery mode - +handle_call({deliver, Delivery = #delivery { immediate = true }}, + From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the %% msg via gm, or b) the master dies before we receive the msg via %% gm. In the case of (a), we will eventually receive the msg via @@ -166,8 +165,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> gen_server2:reply(From, false), %% master may deliver it, not us noreply(maybe_enqueue_message(Delivery, false, State)); -handle_call({deliver, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "mandatory" delivery mode +handle_call({deliver, Delivery = #delivery { mandatory = true }}, + From, State) -> gen_server2:reply(From, true), %% amqqueue throws away the result anyway noreply(maybe_enqueue_message(Delivery, true, State)); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 045cc969..fce61129 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -31,7 +31,7 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). --define(CLOSING_TIMEOUT, 1). +-define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). @@ -345,10 +345,6 @@ handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). -switch_callback(State = #v1{connection_state = blocked, - heartbeater = Heartbeater}, Callback, Length) -> - ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. @@ -380,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector, rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. - TimeoutMillisec = - 1000 * if TimeoutSec > 0 andalso - TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; - true -> ?CLOSING_TIMEOUT - end, - erlang:send_after(TimeoutMillisec, self(), terminate_connection), + erlang:send_after((if TimeoutSec > 0 andalso + TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; + true -> ?CLOSING_TIMEOUT + end) * 1000, self(), terminate_connection), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case termination_kind(Reason) of - controlled -> - channel_cleanup(ChPid), + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, uncontrolled} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_Channel, controlled} -> maybe_close(State); - uncontrolled -> - case channel_cleanup(ChPid) of - undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> rabbit_log:error( - "connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close( - handle_exception(State, Channel, Reason)) - end + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(ChPid) -> @@ -436,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case channel_cleanup(ChPid) of - undefined -> + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> - case termination_kind(Reason) of - controlled -> - ok; - uncontrolled -> - rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]) - end, + {_Channel, controlled} -> + wait_for_channel_termination(N-1, TimerRef); + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> @@ -525,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid, case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), case State#v1.connection_state of - blocking -> State#v1{connection_state = blocked}; + blocking -> ok = rabbit_heartbeat:pause_monitor( + State#v1.heartbeater), + State#v1{connection_state = blocked}; _ -> State end; false -> State diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 31f5ad14..219833b7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -18,21 +18,17 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, match_bindings/2, match_routing_key/2]). +-export([match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([routing_key/0, routing_result/0, match_result/0]). +-export_type([routing_key/0, match_result/0]). -type(routing_key() :: binary()). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(qpids() :: [pid()]). -type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). @@ -44,38 +40,6 @@ %%---------------------------------------------------------------------------- -deliver([], #delivery{mandatory = false, - immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(QNames, Delivery = #delivery{mandatory = false, - immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = lookup_qpids(QNames), - delegate:invoke_no_result( - QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {routed, QPids}; - -deliver(QNames, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = lookup_qpids(QNames), - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), - {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Mandatory, Immediate, {Routed, Handled}). - - %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> @@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) -> %%-------------------------------------------------------------------- -fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; -fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. - -%% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {unroutable, []}; -check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; -check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. - -%% Normally we'd call mnesia:dirty_read/1 here, but that is quite -%% expensive for the reasons explained in rabbit_misc:dirty_read/1. -lookup_qpids(QNames) -> - lists:foldl(fun (QName, QPids) -> - case ets:lookup(rabbit_queue, QName) of - [#amqqueue{pid = QPid, slave_pids = SPids}] -> - [QPid | SPids ++ QPids]; - [] -> - QPids - end - end, [], QNames). - %% Normally we'd call mnesia:dirty_select/2 here, but that is quite %% expensive for the same reasons as above, and, additionally, due to %% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index ff87d989..e524446e 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -59,7 +59,7 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). -%% Return a part of the certificate's subject. +%% Return the parts of the certificate's subject. peer_cert_subject_items(Cert, Type) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 00d46f5a..9afb95b9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2232,7 +2232,7 @@ with_fresh_variable_queue(Fun) -> _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. -publish_and_confirm(QPid, Payload, Count) -> +publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2240,7 +2240,7 @@ publish_and_confirm(QPid, Payload, Count) -> Payload), Delivery = #delivery{mandatory = false, immediate = false, sender = self(), message = Msg, msg_seq_no = Seq}, - true = rabbit_amqqueue:deliver(QPid, Delivery) + {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). @@ -2477,7 +2477,7 @@ test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - publish_and_confirm(QPid, <<>>, Count), + publish_and_confirm(Q, <<>>, Count), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -2507,7 +2507,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:declare(test_queue(), true, false, [], none), Payload = <<0:8388608>>, %% 1MB Count = 30, - publish_and_confirm(QPid, Payload, Count), + publish_and_confirm(Q, Payload, Count), rabbit_amqqueue:set_ram_duration_target(QPid, 0), diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f75da872..26ea502c 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined -> ok; {error, normal} -> case Child#child.restart_type of - permanent -> ReportError(normal); - {permanent, _Delay} -> ReportError(normal); + permanent -> ReportError(normal, Child); + {permanent, _Delay} -> ReportError(normal, Child); _ -> ok end; {error, OtherReason} -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 0d50683d..8678c2c9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -86,6 +86,13 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, %% know this will fail. {stop, normal, State}; +handle_info({inet_async, LSock, Ref, {error, Reason}}, + State=#state{sock=LSock, ref=Ref}) -> + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", + [rabbit_misc:ntoab(Address), Port, Reason]), + accept(State); + handle_info(_Info, State) -> {noreply, State}. |