summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Powell <steve@rabbitmq.com>2012-01-30 14:41:12 +0000
committerSteve Powell <steve@rabbitmq.com>2012-01-30 14:41:12 +0000
commit892ab6014a50a6d911131c79969119a7cffcd75b (patch)
treec5856499b6d40d7b2764d38c37313b0399c67750
parentba94be375854b4b074dcbffc28e3537b364fc2b8 (diff)
parentaae40a595c494e8c2e4d4c153da95523417a5c13 (diff)
downloadrabbitmq-server-892ab6014a50a6d911131c79969119a7cffcd75b.tar.gz
Merge default in
-rw-r--r--docs/html-to-website-xml.xsl44
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--include/rabbit.hrl10
-rw-r--r--src/credit_flow.erl121
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_access_control.erl1
-rw-r--r--src/rabbit_amqqueue.erl78
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_channel.erl157
-rw-r--r--src/rabbit_log.erl33
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
-rw-r--r--src/rabbit_networking.erl7
-rw-r--r--src/rabbit_reader.erl104
-rw-r--r--src/rabbit_writer.erl2
-rw-r--r--src/tcp_listener.erl5
15 files changed, 382 insertions, 250 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 88aa2e78..d83d5073 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -8,8 +8,6 @@
<xsl:output method="xml" />
-<xsl:template match="*"/>
-
<!-- Copy every element through -->
<xsl:template match="*">
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
@@ -28,36 +26,30 @@
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
- <body>
- <doc:div>
- <xsl:choose>
+ <body show-in-this-page="true">
+ <xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
- <p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
- </p>
- <p>
- <a href="../manpages.html">See a list of all manual pages</a>.
- </p>
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="../manpages.html">See a list of all manual pages</a>.
+ </p>
</xsl:when>
<xsl:otherwise>
- <p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
- </p>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
</xsl:otherwise>
- </xsl:choose>
- <p>
+ </xsl:choose>
+ <p>
For more general documentation, please see the
- <a href="../admin-guide.html">administrator's guide</a>.
- </p>
-
- <doc:toc class="compact">
- <doc:heading>Table of Contents</doc:heading>
- </doc:toc>
+ <a href="../admin-guide.html">administrator's guide</a>.
+ </p>
- <xsl:apply-templates select="body/div[@class='refentry']"/>
- </doc:div>
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
</body>
</html>
</xsl:template>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 7268f090..4100864e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1067,10 +1067,26 @@
<listitem><para>The period for which the peer's SSL
certificate is valid.</para></listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>last_blocked_by</term>
+ <listitem><para>The reason for which this connection
+ was last blocked. One of 'mem' - due to a memory
+ alarm, 'flow' - due to internal flow control, or
+ 'none' if the connection was never
+ blocked.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>last_blocked_age</term>
+ <listitem><para>Time, in seconds, since this
+ connection was last blocked, or
+ 'infinity'.</para></listitem>
+ </varlistentry>
+
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1127,8 +1143,9 @@
</varlistentry>
</variablelist>
<para>
- If no <command>connectioninfoitem</command>s are specified then user, peer
- address, peer port and connection state are displayed.
+ If no <command>connectioninfoitem</command>s are
+ specified then user, peer address, peer port, time since
+ flow control and memory block state are displayed.
</para>
<para role="example-prefix">
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d81b82db..c38eca7c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -98,13 +98,3 @@
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
new file mode 100644
index 00000000..7df6c92a
--- /dev/null
+++ b/src/credit_flow.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(credit_flow).
+
+%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep
+%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more
+%% credit by sending a message to the sender. The sender should pass
+%% this message in to handle_bump_msg/1. The sender should block when
+%% it goes below 0 (check by invoking blocked/0). If a process is both
+%% a sender and a receiver it will not grant any more credit to its
+%% senders when it is itself blocked - thus the only processes that
+%% need to check blocked/0 are ones that read from network sockets.
+
+-define(MAX_CREDIT, 200).
+-define(MORE_CREDIT_AT, 150).
+
+-export([ack/1, handle_bump_msg/1, blocked/0, send/1]).
+-export([peer_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(peer_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+ack(To) ->
+ Credit =
+ case get({credit_to, To}, ?MAX_CREDIT) of
+ ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT),
+ ?MAX_CREDIT;
+ C -> C - 1
+ end,
+ put({credit_to, To}, Credit).
+
+handle_bump_msg({From, MoreCredit}) ->
+ Credit = get({credit_from, From}, 0) + MoreCredit,
+ put({credit_from, From}, Credit),
+ case Credit > 0 of
+ true -> unblock(From),
+ ok;
+ false -> ok
+ end.
+
+blocked() ->
+ get(credit_blocked, []) =/= [].
+
+send(From) ->
+ Credit = get({credit_from, From}, ?MAX_CREDIT) - 1,
+ case Credit of
+ 0 -> block(From);
+ _ -> ok
+ end,
+ put({credit_from, From}, Credit).
+
+peer_down(Peer) ->
+ %% In theory we could also remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ unblock(Peer),
+ erase({credit_from, Peer}),
+ erase({credit_to, Peer}).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> Deferred = get(credit_deferred, []),
+ put(credit_deferred, [{To, Msg} | Deferred])
+ end.
+
+block(From) ->
+ put(credit_blocked, [From | get(credit_blocked, [])]).
+
+unblock(From) ->
+ NewBlocks = get(credit_blocked, []) -- [From],
+ put(credit_blocked, NewBlocks),
+ case NewBlocks of
+ [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ _ -> ok
+ end.
+
+get(Key, Default) ->
+ case get(Key) of
+ undefined -> Default;
+ Value -> Value
+ end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 9609eb04..3dcd4938 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -191,7 +191,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -442,8 +442,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -502,8 +501,14 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d686..ec9affa6 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41e644f2..94a99a49 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -120,6 +120,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver([], #delivery{mandatory = false, immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = qpids(Qs),
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
- {routed, QPids};
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
- end.
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 161f9787..c21db21b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -135,7 +134,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -598,6 +596,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -1018,8 +1022,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
@@ -1099,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- {stop, normal, State};
+ true -> {stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1148,7 +1160,6 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f14b2973..603091b1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -111,7 +113,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -188,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -244,7 +250,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -299,13 +310,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -315,6 +326,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -327,9 +342,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -527,7 +543,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -565,8 +581,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -672,7 +687,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
@@ -696,11 +712,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -710,7 +726,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -787,9 +803,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1070,8 +1085,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
+ ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)),
+ State1 = new_tx(State),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1111,10 +1126,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1145,31 +1157,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1363,22 +1356,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1396,7 +1391,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
@@ -1420,13 +1415,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1506,26 +1500,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e0957..8f58f848 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -33,8 +32,6 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,8 +39,6 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
@@ -51,16 +46,6 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
-
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -86,22 +71,6 @@ init([]) -> {ok, none}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
{noreply, State};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8d69a108..06c5beac 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -207,8 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -446,7 +450,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -600,7 +604,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> credit_flow:peer_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index e81f8134..923967ea 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -220,7 +220,12 @@ start_listener(Listener, Protocol, Label, OnConnect) ->
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
- {ok,_} = supervisor:start_child(rabbit_sup, Spec).
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fce61129..6e2ddedb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,8 +27,6 @@
-export([conserve_memory/2, server_properties/1]).
--export([process_channel_frame/5]). %% used by erlang-client
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
@@ -40,10 +38,12 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, last_blocked_by, last_blocked_age,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -90,10 +90,6 @@
-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
--spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
-
-endif.
%%--------------------------------------------------------------------------
@@ -220,7 +216,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ conserve_memory = false,
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -277,11 +276,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(State));
+ mainloop(Deb, maybe_close(control_throttle(State)));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -341,6 +340,9 @@ handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -355,17 +357,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ case {CS, Mem orelse credit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
+maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -387,17 +402,19 @@ handle_dependent_exit(ChPid, Reason, State) ->
{undefined, uncontrolled} ->
exit({abnormal_dependent_exit, ChPid, Reason});
{_Channel, controlled} ->
- maybe_close(State);
+ maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
[self(), Channel, Reason]),
- maybe_close(handle_exception(State, Channel, Reason))
+ maybe_close(handle_exception(control_throttle(State),
+ Channel, Reason))
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel
@@ -485,12 +502,12 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ChPid, FramingState} ->
+ {ChPid, AState} ->
NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
+ AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
+ post_process_frame(AnalyzedFrame, ChPid,
+ control_throttle(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -504,18 +521,13 @@ handle_frame(Type, Channel, Payload,
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ control_throttle(State);
post_process_frame({method, MethodName, _}, _ChPid,
State = #v1{connection = #connection{
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> ok = rabbit_heartbeat:pause_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = blocked};
- _ -> State
- end;
+ maybe_block(State);
false -> State
end;
post_process_frame(_Frame, _ChPid, State) ->
@@ -687,10 +699,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -822,6 +835,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->
@@ -890,21 +909,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
+ NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
State.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
- Method, Content),
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
NewAState;
- {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ {error, Reason} -> self() ! {channel_exit, Channel,
Reason},
AState
end.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4..f6062e06 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -169,12 +169,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 9a82ac88..e5db4c9f 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -72,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.