diff options
author | Steve Powell <steve@rabbitmq.com> | 2012-01-30 14:41:12 +0000 |
---|---|---|
committer | Steve Powell <steve@rabbitmq.com> | 2012-01-30 14:41:12 +0000 |
commit | 892ab6014a50a6d911131c79969119a7cffcd75b (patch) | |
tree | c5856499b6d40d7b2764d38c37313b0399c67750 | |
parent | ba94be375854b4b074dcbffc28e3537b364fc2b8 (diff) | |
parent | aae40a595c494e8c2e4d4c153da95523417a5c13 (diff) | |
download | rabbitmq-server-892ab6014a50a6d911131c79969119a7cffcd75b.tar.gz |
Merge default in
-rw-r--r-- | docs/html-to-website-xml.xsl | 44 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 23 | ||||
-rw-r--r-- | include/rabbit.hrl | 10 | ||||
-rw-r--r-- | src/credit_flow.erl | 121 | ||||
-rw-r--r-- | src/rabbit.erl | 13 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 78 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 157 | ||||
-rw-r--r-- | src/rabbit_log.erl | 33 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 11 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 7 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 104 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 2 | ||||
-rw-r--r-- | src/tcp_listener.erl | 5 |
15 files changed, 382 insertions, 250 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 88aa2e78..d83d5073 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -8,8 +8,6 @@ <xsl:output method="xml" /> -<xsl:template match="*"/> - <!-- Copy every element through --> <xsl:template match="*"> <xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml"> @@ -28,36 +26,30 @@ <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> - <body> - <doc:div> - <xsl:choose> + <body show-in-this-page="true"> + <xsl:choose> <xsl:when test="document($original)/refentry/refmeta/manvolnum"> - <p> - This is the manual page for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. - </p> - <p> - <a href="../manpages.html">See a list of all manual pages</a>. - </p> + <p> + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + </p> + <p> + <a href="../manpages.html">See a list of all manual pages</a>. + </p> </xsl:when> <xsl:otherwise> - <p> - This is the documentation for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. - </p> + <p> + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + </p> </xsl:otherwise> - </xsl:choose> - <p> + </xsl:choose> + <p> For more general documentation, please see the - <a href="../admin-guide.html">administrator's guide</a>. - </p> - - <doc:toc class="compact"> - <doc:heading>Table of Contents</doc:heading> - </doc:toc> + <a href="../admin-guide.html">administrator's guide</a>. + </p> - <xsl:apply-templates select="body/div[@class='refentry']"/> - </doc:div> + <xsl:apply-templates select="body/div[@class='refentry']"/> </body> </html> </xsl:template> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 7268f090..4100864e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1067,10 +1067,26 @@ <listitem><para>The period for which the peer's SSL certificate is valid.</para></listitem> </varlistentry> + + <varlistentry> + <term>last_blocked_by</term> + <listitem><para>The reason for which this connection + was last blocked. One of 'mem' - due to a memory + alarm, 'flow' - due to internal flow control, or + 'none' if the connection was never + blocked.</para></listitem> + </varlistentry> + <varlistentry> + <term>last_blocked_age</term> + <listitem><para>Time, in seconds, since this + connection was last blocked, or + 'infinity'.</para></listitem> + </varlistentry> + <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1127,8 +1143,9 @@ </varlistentry> </variablelist> <para> - If no <command>connectioninfoitem</command>s are specified then user, peer - address, peer port and connection state are displayed. + If no <command>connectioninfoitem</command>s are + specified then user, peer address, peer port, time since + flow control and memory block state are displayed. </para> <para role="example-prefix"> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d81b82db..c38eca7c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -98,13 +98,3 @@ -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). - --ifdef(debug). --define(LOGDEBUG0(F), rabbit_log:debug(F)). --define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). --define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)). --else. --define(LOGDEBUG0(F), ok). --define(LOGDEBUG(F,A), ok). --define(LOGMESSAGE(D,C,M,Co), ok). --endif. diff --git a/src/credit_flow.erl b/src/credit_flow.erl new file mode 100644 index 00000000..7df6c92a --- /dev/null +++ b/src/credit_flow.erl @@ -0,0 +1,121 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(credit_flow). + +%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep +%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more +%% credit by sending a message to the sender. The sender should pass +%% this message in to handle_bump_msg/1. The sender should block when +%% it goes below 0 (check by invoking blocked/0). If a process is both +%% a sender and a receiver it will not grant any more credit to its +%% senders when it is itself blocked - thus the only processes that +%% need to check blocked/0 are ones that read from network sockets. + +-define(MAX_CREDIT, 200). +-define(MORE_CREDIT_AT, 150). + +-export([ack/1, handle_bump_msg/1, blocked/0, send/1]). +-export([peer_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). + +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(peer_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%% There are two "flows" here; of messages and of credit, going in +%% opposite directions. The variable names "From" and "To" refer to +%% the flow of credit, but the function names refer to the flow of +%% messages. This is the clearest I can make it (since the function +%% names form the API and want to make sense externally, while the +%% variable names are used in credit bookkeeping and want to make +%% sense internally). + +ack(To) -> + Credit = + case get({credit_to, To}, ?MAX_CREDIT) of + ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT), + ?MAX_CREDIT; + C -> C - 1 + end, + put({credit_to, To}, Credit). + +handle_bump_msg({From, MoreCredit}) -> + Credit = get({credit_from, From}, 0) + MoreCredit, + put({credit_from, From}, Credit), + case Credit > 0 of + true -> unblock(From), + ok; + false -> ok + end. + +blocked() -> + get(credit_blocked, []) =/= []. + +send(From) -> + Credit = get({credit_from, From}, ?MAX_CREDIT) - 1, + case Credit of + 0 -> block(From); + _ -> ok + end, + put({credit_from, From}, Credit). + +peer_down(Peer) -> + %% In theory we could also remove it from credit_deferred here, but it + %% doesn't really matter; at some point later we will drain + %% credit_deferred and thus send messages into the void... + unblock(Peer), + erase({credit_from, Peer}), + erase({credit_to, Peer}). + +%% -------------------------------------------------------------------------- + +grant(To, Quantity) -> + Msg = {bump_credit, {self(), Quantity}}, + case blocked() of + false -> To ! Msg; + true -> Deferred = get(credit_deferred, []), + put(credit_deferred, [{To, Msg} | Deferred]) + end. + +block(From) -> + put(credit_blocked, [From | get(credit_blocked, [])]). + +unblock(From) -> + NewBlocks = get(credit_blocked, []) -- [From], + put(credit_blocked, NewBlocks), + case NewBlocks of + [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + _ -> ok + end. + +get(Key, Default) -> + case get(Key) of + undefined -> Default; + Value -> Value + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 9609eb04..3dcd4938 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -191,7 +191,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -442,8 +442,7 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n", - [Reason, erlang:get_stacktrace()]) + _:Reason -> boot_step_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -502,8 +501,14 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_step_error(Reason, Stacktrace) -> + boot_error("Error description:~n ~p~n~n" + "Log files (may contain more information):~n ~s~n ~s~n~n" + "Stack trace:~n ~p~n~n", + [Reason, log_location(kernel), log_location(sasl), Stacktrace]). + boot_error(Format, Args) -> - io:format("BOOT ERROR: " ++ Format, Args), + io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), error_logger:error_msg(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index ca28d686..ec9affa6 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) -> check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> - ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 41e644f2..94a99a49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -120,6 +120,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver([], #delivery{mandatory = false, immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = qpids(Qs), - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), - {routed, QPids}; +deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} - end. +deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + case Flow of + flow -> [credit_flow:send(QPid) || QPid <- QPids]; + noflow -> ok + end, + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {deliver, Delivery, Flow}) + end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, + _Flow) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. + qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 161f9787..c21db21b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- init(Q) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), State = #q{q = Q#amqqueue{pid = self()}, @@ -135,7 +134,6 @@ init(Q) -> init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, AckTags, Deliveries, MTC) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -598,6 +596,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> + case get({ch_publisher, DownPid}) of + undefined -> ok; + MRef -> erlang:demonitor(MRef), + erase({ch_publisher, DownPid}), + credit_flow:peer_down(DownPid) + end, case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -1018,8 +1022,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> ok + end, noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> @@ -1099,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - {stop, normal, State}; + true -> {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; @@ -1148,7 +1160,6 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(Info, State) -> - ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f14b2973..603091b1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -78,6 +78,8 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -111,7 +113,11 @@ do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> gen_server2:call(Pid, flush, infinity). @@ -188,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = dict:new(), + queue_monitors = sets:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -244,7 +250,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State) -> +handle_cast({method, Method, Content, Flow}, + State = #ch{reader_pid = Reader}) -> + case Flow of + flow -> credit_flow:ack(Reader); + noflow -> ok + end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -299,13 +310,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State3#ch{next_tag = DeliveryTag + 1}); + noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -315,6 +326,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(timeout, State) -> noreply(State); @@ -327,9 +342,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = - dict:erase(QPid, State3#ch.queue_monitors)}); + sets:del_element(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -527,7 +543,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> #'channel.flow_ok'{active = false}); _ -> ok end, - demonitor_queue(QPid, State#ch{blocking = Blocking1}) + State#ch{blocking = Blocking1} end. record_confirm(undefined, _, State) -> @@ -565,8 +581,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of true -> UQM1 = gb_trees:delete(QPid, UQM), - demonitor_queue( - QPid, State#ch{unconfirmed_qm = UQM1}); + State#ch{unconfirmed_qm = UQM1}; false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), State#ch{unconfirmed_qm = UQM1} end; @@ -672,7 +687,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of - none -> ack(Acked, State1); + none -> ack(Acked, State1), + State1; in_progress -> State1#ch{uncommitted_acks = Acked ++ State1#ch.uncommitted_acks} end}; @@ -696,11 +712,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - State2 = maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -710,7 +726,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State3#ch{next_tag = DeliveryTag + 1}}; + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -787,9 +803,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, false -> dict:store(QPid, CTags1, QCons) end end, - NewState = demonitor_queue( - Q, State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QCons1}), + NewState = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}, %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1070,8 +1085,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_acks = TAL}) -> - State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), + ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)), + State1 = new_tx(State), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1111,10 +1126,7 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = lists:foldl(fun monitor_queue/2, - State1#ch{blocking = - sets:from_list(QPids)}, - QPids), + QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, State2} end; @@ -1145,31 +1157,12 @@ consumer_monitor(ConsumerTag, end. monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (not dict:is_key(QPid, QMons) andalso - queue_monitor_needed(QPid, State)) of - true -> MRef = erlang:monitor(process, QPid), - State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; - false -> State - end. - -demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (dict:is_key(QPid, QMons) andalso - not queue_monitor_needed(QPid, State)) of - true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), - State#ch{queue_monitors = dict:erase(QPid, QMons)}; + case not sets:is_element(QPid, QMons) of + true -> erlang:monitor(process, QPid), + State#ch{queue_monitors = sets:add_element(QPid, QMons)}; false -> State end. -queue_monitor_needed(QPid, #ch{queue_consumers = QCons, - blocking = Blocking, - unconfirmed_qm = UQM} = State) -> - StatsEnabled = rabbit_event:stats_level( - State, #ch.stats_timer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1363,22 +1356,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ msg_seq_no = MsgSeqNo}, QNames}, State) -> {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State), + rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), + State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1). + QPid <- DeliveredQPids]], publish, State2), + State2. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State)); + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{XName, 1}], return_not_delivered, State)); + maybe_incr_stats([{XName, 1}], return_not_delivered, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> @@ -1396,7 +1391,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State0#ch{unconfirmed_qm = UQM1}; none -> UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + State0#ch{unconfirmed_qm = UQM1} end end, State#ch{unconfirmed_mq = UMQ1}, QPids). @@ -1420,13 +1415,12 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx_status = none, confirmed = []}) -> State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - {MsgSeqNos, State1} = - lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> - {[MsgSeqNo | MSNs], - maybe_incr_stats([{ExchangeName, 1}], confirm, - State0)} - end, {[], State}, lists:append(C)), - send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); + MsgSeqNos = + lists:foldl(fun ({MsgSeqNo, XName}, MSNs) -> + maybe_incr_stats([{XName, 1}], confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1506,26 +1500,21 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, State) -> - State. +maybe_incr_redeliver_stats(_, _, _State) -> + ok. maybe_incr_stats(QXIncs, Measure, State) -> case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> lists:foldl(fun ({QX, Inc}, State0) -> - incr_stats(QX, Inc, Measure, State0) - end, State, QXIncs); - _ -> State + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok end. -incr_stats({QPid, _} = QX, Inc, Measure, State) -> - update_measures(queue_exchange_stats, QX, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> - update_measures(queue_stats, QPid, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(X, Inc, Measure, State) -> - update_measures(exchange_stats, X, Inc, Measure), - State. +incr_stats({_, _} = QX, Inc, Measure) -> + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 558e0957..8f58f848 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -23,8 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([debug/1, debug/2, message/4, info/1, info/2, - warning/1, warning/2, error/1, error/2]). +-export([info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). @@ -33,8 +32,6 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(debug/1 :: (string()) -> 'ok'). --spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -42,8 +39,6 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). --spec(message/4 :: (_,_,_,_) -> 'ok'). - -endif. %%---------------------------------------------------------------------------- @@ -51,16 +46,6 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -debug(Fmt) -> - gen_server:cast(?SERVER, {debug, Fmt}). - -debug(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {debug, Fmt, Args}). - -message(Direction, Channel, MethodRecord, Content) -> - gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). - info(Fmt) -> gen_server:cast(?SERVER, {info, Fmt}). @@ -86,22 +71,6 @@ init([]) -> {ok, none}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({debug, Fmt}, State) -> - io:format("debug:: "), io:format(Fmt), - error_logger:info_msg("debug:: " ++ Fmt), - {noreply, State}; -handle_cast({debug, Fmt, Args}, State) -> - io:format("debug:: "), io:format(Fmt, Args), - error_logger:info_msg("debug:: " ++ Fmt, Args), - {noreply, State}; -handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> - io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), - {noreply, State}; handle_cast({info, Fmt}, State) -> error_logger:info_msg(Fmt), {noreply, State}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8d69a108..06c5beac 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -207,8 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery {}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -446,7 +450,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MonitoringPids = [begin true = erlang:demonitor(MRef), + MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid end || {Pid, MRef} <- dict:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( @@ -600,7 +604,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> confirm_sender_death(ChPid) + true -> credit_flow:peer_down(ChPid), + confirm_sender_death(ChPid) end, State. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index e81f8134..923967ea 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -220,7 +220,12 @@ start_listener(Listener, Protocol, Label, OnConnect) -> start_listener0(Address, Protocol, Label, OnConnect) -> Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), Protocol, Label, OnConnect), - {ok,_} = supervisor:start_child(rabbit_sup, Spec). + case supervisor:start_child(rabbit_sup, Spec) of + {ok, _} -> ok; + {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address, + exit({could_not_start_tcp_listener, + {rabbit_misc:ntoa(IPAddress), Port}}) + end. stop_tcp_listener(Listener) -> [stop_tcp_listener0(Address) || diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fce61129..6e2ddedb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,8 +27,6 @@ -export([conserve_memory/2, server_properties/1]). --export([process_channel_frame/5]). %% used by erlang-client - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). @@ -40,10 +38,12 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, conserve_memory, + last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, channels]). + send_pend, state, last_blocked_by, last_blocked_age, + channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, @@ -90,10 +90,6 @@ -spec(system_continue/3 :: (_,_,#v1{}) -> any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). --spec(process_channel_frame/5 :: - (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), - tuple()) -> tuple()). - -endif. %%-------------------------------------------------------------------------- @@ -220,7 +216,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + conserve_memory = false, + last_blocked_by = none, + last_blocked_at = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -277,11 +276,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end. handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); + recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(State)); + mainloop(Deb, maybe_close(control_throttle(State))); handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -341,6 +340,9 @@ handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other({bump_credit, Msg}, Deb, State) -> + credit_flow:handle_bump_msg(Msg), + recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -355,17 +357,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +control_throttle(State = #v1{connection_state = CS, + conserve_memory = Mem}) -> + case {CS, Mem orelse credit_flow:blocked()} of + {running, true} -> State#v1{connection_state = blocking}; + {blocking, false} -> State#v1{connection_state = running}; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + State#v1{connection_state = running}; + {blocked, true} -> update_last_blocked_by(State); + {_, _} -> State + end. + +maybe_block(State = #v1{connection_state = blocking}) -> + ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + update_last_blocked_by(State#v1{connection_state = blocked, + last_blocked_at = erlang:now()}); +maybe_block(State) -> State. +update_last_blocked_by(State = #v1{conserve_memory = true}) -> + State#v1{last_blocked_by = mem}; +update_last_blocked_by(State = #v1{conserve_memory = false}) -> + State#v1{last_blocked_by = flow}. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -387,17 +402,19 @@ handle_dependent_exit(ChPid, Reason, State) -> {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_Channel, controlled} -> - maybe_close(State); + maybe_close(control_throttle(State)); {Channel, uncontrolled} -> rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", [self(), Channel, Reason]), - maybe_close(handle_exception(State, Channel, Reason)) + maybe_close(handle_exception(control_throttle(State), + Channel, Reason)) end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> erase({channel, Channel}), + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), Channel @@ -485,12 +502,12 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ChPid, FramingState} -> + {ChPid, AState} -> NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), + AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, State); + post_process_frame(AnalyzedFrame, ChPid, + control_throttle(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -504,18 +521,13 @@ handle_frame(Type, Channel, Payload, post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + control_throttle(State); post_process_frame({method, MethodName, _}, _ChPid, State = #v1{connection = #connection{ protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - case State#v1.connection_state of - blocking -> ok = rabbit_heartbeat:pause_monitor( - State#v1.heartbeater), - State#v1{connection_state = blocked}; - _ -> State - end; + maybe_block(State); false -> State end; post_process_frame(_Frame, _ChPid, State) -> @@ -687,10 +699,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = control_throttle( State#v1{connection_state = running, - connection = NewConnection}), + connection = NewConnection, + conserve_memory = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -822,6 +835,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; +i(last_blocked_by, #v1{last_blocked_by = By}) -> + By; +i(last_blocked_age, #v1{last_blocked_at = never}) -> + infinity; +i(last_blocked_age, #v1{last_blocked_at = T}) -> + timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); i(protocol, #v1{connection = #connection{protocol = none}}) -> @@ -890,21 +909,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, self(), - Channel, ChPid, AState), + NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), State. -process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> +process_channel_frame(Frame, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, - Method, Content), + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), NewAState; - {error, Reason} -> ErrPid ! {channel_exit, Channel, + {error, Reason} -> self() ! {channel_exit, Channel, Reason}, AState end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 091b50e4..f6062e06 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,12 +169,10 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- assemble_frame(Channel, MethodRecord, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 9a82ac88..e5db4c9f 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -72,8 +72,9 @@ init({IPAddress, Port, SocketOpts, label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start ~s on ~s:~p - ~p~n", - [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p (~s)~n", + [Label, rabbit_misc:ntoab(IPAddress), Port, + Reason, inet:format_error(Reason)]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. |