summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-17 01:16:12 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-17 01:16:12 +0000
commitaea58d91b22163ee8cee76ce8d4b62224ebfa0e7 (patch)
tree7552d12081d8a50492b1674690091ec675a7e328
parent2e3f4c1a4e2ae72b364f9ca37ee0976ad5809d9c (diff)
parent5ce6507e41f83da26f51e6139ef2de4020516d51 (diff)
downloadrabbitmq-server-aea58d91b22163ee8cee76ce8d4b62224ebfa0e7.tar.gz
merge bug 24265 into default (Somehow support DNs in rabbitmq_auth_mechanism_ssl)
-rw-r--r--docs/rabbitmqctl.1.xml7
-rw-r--r--ebin/rabbit_app.in1
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rw-r--r--src/file_handle_cache.erl11
-rw-r--r--src/rabbit.erl33
-rw-r--r--src/rabbit_amqqueue.erl66
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_basic.erl6
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_control.erl14
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
-rw-r--r--src/rabbit_reader.erl60
-rw-r--r--src/rabbit_router.erl60
-rw-r--r--src/rabbit_ssl.erl2
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/supervisor2.erl4
-rw-r--r--src/tcp_acceptor.erl7
19 files changed, 148 insertions, 156 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 15755038..7268f090 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1273,9 +1273,10 @@
<para>
Displays broker status information such as the running
applications on the current Erlang node, RabbitMQ and
- Erlang versions, OS name and memory statistics. (See
- the <command>cluster_status</command> command to find
- out which nodes are clustered and running.)
+ Erlang versions, OS name, memory and file descriptor
+ statistics. (See the <command>cluster_status</command>
+ command to find out which nodes are clustered and
+ running.)
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl status</screen>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5ead1051..9301af6b 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -42,5 +42,6 @@
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
+ {linger, {true, 0}},
{exit_on_close, false}]}
]}]}.
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 1831f876..39a68c8e 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -117,7 +117,7 @@ exec erl \
-sasl sasl_error_logger false \
-rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
- -os_mon start_cpu_sup true \
+ -os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index c27b418a..44ce1ce1 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -139,7 +139,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 0be1129a..1582bfb1 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -204,7 +204,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6c3f1b5f..c11fb54b 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -125,8 +125,7 @@
%% requesting process is considered to 'own' one more
%% descriptor. release/0 is the inverse operation and releases a
%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain is
-%% used to obtain permission to accept file descriptors. Obtain has a
+%% file descriptor between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -262,7 +261,7 @@
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [obtain_count, obtain_limit]).
+-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
%%----------------------------------------------------------------------------
%% Public API
@@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
-i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(total_limit, #fhc_state{limit = Limit}) -> Limit;
+i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
+i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0a2681a2..607033cb 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -308,17 +308,28 @@ stop_and_halt() ->
ok.
status() ->
- [{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications(infinity)},
- {os, os:type()},
- {erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}] ++
- rabbit_misc:filter_exit_map(
- fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
- [{vm_memory_high_watermark, {vm_memory_monitor,
- get_vm_memory_high_watermark, []}},
- {vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]).
+ S1 = [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications(infinity)},
+ {os, os:type()},
+ {erlang_version, erlang:system_info(system_version)},
+ {memory, erlang:memory()}],
+ S2 = rabbit_misc:filter_exit_map(
+ fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
+ [{vm_memory_high_watermark, {vm_memory_monitor,
+ get_vm_memory_high_watermark, []}},
+ {vm_memory_limit, {vm_memory_monitor,
+ get_memory_limit, []}}]),
+ S3 = rabbit_misc:with_exit_handler(
+ fun () -> [] end,
+ fun () -> [{file_descriptors, file_handle_cache:info()}] end),
+ S4 = [{processes, [{limit, erlang:system_info(process_limit)},
+ {used, erlang:system_info(process_count)}]},
+ {run_queue, erlang:statistics(run_queue)},
+ {uptime, begin
+ {T,_} = erlang:statistics(wall_clock),
+ T div 1000
+ end}],
+ S1 ++ S2 ++ S3 ++ S4.
is_running() -> is_running(node()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 96017df8..41e644f2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,17 +44,17 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, routing_result/0]).
-type(name() :: rabbit_types:r('queue')).
-
+-type(qpids() :: [pid()]).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -69,7 +69,8 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
+ rabbit_types:error('not_found');
+ ([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
@@ -117,12 +118,13 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
+-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
@@ -134,7 +136,7 @@
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
+-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -419,14 +425,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver(QPid, Delivery = #delivery{immediate = true}) ->
- gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
-deliver(QPid, Delivery = #delivery{mandatory = true}) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity),
- true;
-deliver(QPid, Delivery) ->
- gen_server2:cast(QPid, {deliver, Delivery}),
- true.
+deliver([], #delivery{mandatory = false, immediate = false}) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -518,6 +549,9 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+qpids(Qs) -> lists:append([[QPid | SPids] ||
+ #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba20b355..161f9787 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -877,9 +877,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery}, _From, State) ->
- %% Synchronous, "immediate" delivery mode
- %%
+handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% FIXME: Is this correct semantics?
%%
%% I'm worried in particular about the case where an exchange has
@@ -897,8 +895,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
false -> discard_delivery(Delivery, State1)
end);
-handle_call({deliver, Delivery}, From, State) ->
- %% Synchronous, "mandatory" delivery mode. Reply asap.
+handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b266d366..b116821c 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -29,7 +29,7 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_router:routing_result(), [pid()]}
+ ({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -88,8 +88,8 @@ publish(Delivery = #delivery{
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b2fe28c..f14b2973 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1362,7 +1362,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
QNames}, State) ->
- {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
State1 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 20486af5..22b57b1a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -79,6 +79,12 @@ start() ->
io:format(Format ++ " ...~n", Args1)
end
end,
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
case catch action(Command, Node, Args, Opts, Inform) of
@@ -88,9 +94,11 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
+ PrintInvalidCommandError(),
usage();
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d68063db..8d69a108 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -148,9 +148,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "immediate" delivery mode
-
+handle_call({deliver, Delivery = #delivery { immediate = true }},
+ From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
%% msg via gm, or b) the master dies before we receive the msg via
%% gm. In the case of (a), we will eventually receive the msg via
@@ -166,8 +165,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
gen_server2:reply(From, false), %% master may deliver it, not us
noreply(maybe_enqueue_message(Delivery, false, State));
-handle_call({deliver, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "mandatory" delivery mode
+handle_call({deliver, Delivery = #delivery { mandatory = true }},
+ From, State) ->
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
noreply(maybe_enqueue_message(Delivery, true, State));
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 045cc969..fce61129 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -31,7 +31,7 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
--define(CLOSING_TIMEOUT, 1).
+-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
@@ -345,10 +345,6 @@ handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
-switch_callback(State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}, Callback, Length) ->
- ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
@@ -380,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector,
rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
- TimeoutMillisec =
- 1000 * if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end,
- erlang:send_after(TimeoutMillisec, self(), terminate_connection),
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- channel_cleanup(ChPid),
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_Channel, controlled} ->
maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> rabbit_log:error(
- "connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(
- handle_exception(State, Channel, Reason))
- end
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(ChPid) ->
@@ -436,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
+ {_Channel, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef);
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -525,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid,
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
case State#v1.connection_state of
- blocking -> State#v1{connection_state = blocked};
+ blocking -> ok = rabbit_heartbeat:pause_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = blocked};
_ -> State
end;
false -> State
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 31f5ad14..219833b7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -18,21 +18,17 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2, match_bindings/2, match_routing_key/2]).
+-export([match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0, match_result/0]).
+-export_type([routing_key/0, match_result/0]).
-type(routing_key() :: binary()).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(qpids() :: [pid()]).
-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
@@ -44,38 +40,6 @@
%%----------------------------------------------------------------------------
-deliver([], #delivery{mandatory = false,
- immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(QNames, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = lookup_qpids(QNames),
- delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {routed, QPids};
-
-deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = lookup_qpids(QNames),
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Mandatory, Immediate, {Routed, Handled}).
-
-
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
match_bindings(SrcName, Match) ->
@@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) ->
%%--------------------------------------------------------------------
-fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
-fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
-
-%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {unroutable, []};
-check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
-check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
-
-%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
-%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
-lookup_qpids(QNames) ->
- lists:foldl(fun (QName, QPids) ->
- case ets:lookup(rabbit_queue, QName) of
- [#amqqueue{pid = QPid, slave_pids = SPids}] ->
- [QPid | SPids ++ QPids];
- [] ->
- QPids
- end
- end, [], QNames).
-
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index ff87d989..e524446e 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -59,7 +59,7 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
-%% Return a part of the certificate's subject.
+%% Return the parts of the certificate's subject.
peer_cert_subject_items(Cert, Type) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 00d46f5a..9afb95b9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2232,7 +2232,7 @@ with_fresh_variable_queue(Fun) ->
_ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
-publish_and_confirm(QPid, Payload, Count) ->
+publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2240,7 +2240,7 @@ publish_and_confirm(QPid, Payload, Count) ->
Payload),
Delivery = #delivery{mandatory = false, immediate = false,
sender = self(), message = Msg, msg_seq_no = Seq},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
+ {routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
@@ -2477,7 +2477,7 @@ test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- publish_and_confirm(QPid, <<>>, Count),
+ publish_and_confirm(Q, <<>>, Count),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
@@ -2507,7 +2507,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- publish_and_confirm(QPid, Payload, Count),
+ publish_and_confirm(Q, Payload, Count),
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f75da872..26ea502c 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
ok;
{error, normal} ->
case Child#child.restart_type of
- permanent -> ReportError(normal);
- {permanent, _Delay} -> ReportError(normal);
+ permanent -> ReportError(normal, Child);
+ {permanent, _Delay} -> ReportError(normal, Child);
_ -> ok
end;
{error, OtherReason} ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 0d50683d..8678c2c9 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -86,6 +86,13 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
%% know this will fail.
{stop, normal, State};
+handle_info({inet_async, LSock, Ref, {error, Reason}},
+ State=#state{sock=LSock, ref=Ref}) ->
+ {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
+ [rabbit_misc:ntoab(Address), Port, Reason]),
+ accept(State);
+
handle_info(_Info, State) ->
{noreply, State}.