diff options
author | Emile Joubert <emile@rabbitmq.com> | 2014-01-23 15:53:52 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2014-01-23 15:53:52 +0000 |
commit | 660df83677206e69ab5a4e4936531345110f9765 (patch) | |
tree | 0112f03fdad3ca5e5e80267a75601b0b4bd76768 | |
parent | 7456792ec74cdf72afa7ddc54935b22e933f4316 (diff) | |
parent | 0148ab85facdb6b827d4305b1ceba44e532b2c7e (diff) | |
download | rabbitmq-server-660df83677206e69ab5a4e4936531345110f9765.tar.gz |
Merged stable to default
39 files changed, 1477 insertions, 1059 deletions
@@ -187,7 +187,7 @@ def genErl(spec): elif type == 'table': return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary' - def genFieldPostprocessing(packed): + def genFieldPostprocessing(packed, hasContent): for f in packed: type = erlType(f.domain) if type == 'bit': @@ -199,6 +199,10 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + # We skip the check on content-bearing methods for + # speed. This is a sanity check, not a security thing. + elif type == 'shortstr' and not hasContent: + print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index) else: pass @@ -214,7 +218,7 @@ def genErl(spec): restSeparator = '' recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments)) print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern) - genFieldPostprocessing(packedFields) + genFieldPostprocessing(packedFields, m.hasContent) print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 91402649..c0d6cc70 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -138,6 +138,11 @@ %% %% {frame_max, 131072}, + %% Set the max permissible number of channels per connection. + %% 0 means "no limit". + %% + %% {channel_max, 128}, + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6ec7ee07..d19acd00 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,6 +1135,13 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> + <term>consumer_utilisation</term> + <listitem><para>Fraction of the time (between 0.0 and 1.0) + that the queue is able to immediately deliver messages to + consumers. This can be less than 1.0 if consumers are limited + by network congestion or prefetch count.</para></listitem> + </varlistentry> + <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> @@ -1390,24 +1397,9 @@ </varlistentry> <varlistentry> - <term>last_blocked_by</term> - <listitem><para>The reason for which this connection - was last blocked. One of 'resource' - due to a memory - or disk alarm, 'flow' - due to internal flow control, or - 'none' if the connection was never - blocked.</para></listitem> - </varlistentry> - <varlistentry> - <term>last_blocked_age</term> - <listitem><para>Time, in seconds, since this - connection was last blocked, or - 'infinity'.</para></listitem> - </varlistentry> - - <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1438,6 +1430,10 @@ <listitem><para>Maximum frame size (bytes).</para></listitem> </varlistentry> <varlistentry> + <term>channel_max</term> + <listitem><para>Maximum number of channels on this connection.</para></listitem> + </varlistentry> + <varlistentry> <term>client_properties</term> <listitem><para>Informational properties transmitted by the client during connection establishment.</para></listitem> @@ -1563,14 +1559,6 @@ <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> - <varlistentry> - <term>client_flow_blocked</term> - <listitem><para>True if the client issued a - <command>channel.flow{active=false}</command> - command, blocking the server from delivering - messages to the channel's consumers. - </para></listitem> - </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index f0fee96a..29f06e79 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -25,6 +25,7 @@ %% 0 ("no limit") would make a better default, but that %% breaks the QPid Java client {frame_max, 131072}, + {channel_max, 0}, {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0f1b7a50..8b42cdea 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -118,3 +118,5 @@ %% to allow plenty of leeway for the #basic_message{} and #content{} %% wrapping the message body). -define(MAX_MSG_SIZE, 2147383648). + +-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)). diff --git a/src/credit_flow.erl b/src/credit_flow.erl index d48d649e..39a257ac 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -30,7 +30,7 @@ -define(DEFAULT_CREDIT, {200, 50}). --export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of _ -> true end. +state() -> case blocked() of + true -> flow; + false -> case get(credit_blocked_at) of + undefined -> running; + B -> Diff = timer:now_diff(erlang:now(), B), + case Diff < 5000000 of + true -> flow; + false -> running + end + end + end. + peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -128,7 +140,12 @@ grant(To, Quantity) -> true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). +block(From) -> + case blocked() of + false -> put(credit_blocked_at, erlang:now()); + true -> ok + end, + ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), @@ -546,6 +546,7 @@ forget_group(GroupName) -> ok. init([GroupName, Module, Args, TxnFun]) -> + put(process_name, {?MODULE, GroupName}), {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d54c2a8d..19171659 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), lists:foldl( - fun(Module, {refused, _, _}) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> - {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> - Else + fun ({ModN, ModZ}, {refused, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ + %% module, and use the #user{} the latter gives us. + case try_login(ModN, Username, AuthProps) of + {ok, _} -> try_login(ModZ, Username, []); + Else -> Else end; - (_, {ok, User}) -> + (Mod, {refused, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + try_login(Mod, Username, AuthProps); + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... {ok, User} end, {refused, "No modules checked '~s'", [Username]}, Modules). +try_login(Module, Username, AuthProps) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> Else + end. + check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 282113a4..773ad60d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,8 +26,8 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). --export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). @@ -150,9 +150,9 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/10 :: +-spec(basic_consume/9 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) + rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -160,7 +160,6 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -426,7 +425,7 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_max_length_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. @@ -436,7 +435,7 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. -check_max_length_arg({Type, Val}, Args) -> +check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; ok -> {error, {value_negative, Val}}; @@ -549,8 +548,8 @@ requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). -reject(QPid, MsgIds, Requeue, ChPid) -> - delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). +reject(QPid, Requeue, MsgIds, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), @@ -571,13 +570,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). -basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, - LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> - ok = check_consume_arguments(QName, OtherArgs), +basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, - OkMsg}). + ConsumerTag, ExclusiveConsume, Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -604,8 +601,6 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). - internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 96851882..3322b3d1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,6 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -38,7 +37,7 @@ has_had_consumers, backing_queue, backing_queue_state, - active_consumers, + consumers, expires, sync_timer_ref, rate_timer_ref, @@ -56,21 +55,6 @@ status }). --record(consumer, {tag, ack_required, args}). - -%% These are held in our process dictionary --record(cr, {ch_pid, - monitor_ref, - acktags, - consumer_count, - %% Queue of {ChPid, #consumer{}} for consumers which have - %% been blocked for any reason - blocked_consumers, - %% The limiter itself - limiter, - %% Internal flow control for queue -> writer - unsent_message_count}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -95,11 +79,12 @@ messages_unacknowledged, messages, consumers, + consumer_utilisation, memory, slave_pids, synchronised_slave_pids, backing_queue_status, - status + state ]). -define(CREATION_EVENT_KEYS, @@ -122,6 +107,7 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), + ?store_proc_name(Q#amqqueue.name), {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -141,14 +127,14 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - active_consumers = priority_queue:new(), + consumers = rabbit_queue_consumers:new(), senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -203,7 +189,7 @@ declare(Recover, From, State = #q{q = Q, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -228,18 +214,17 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +maybe_notify_decorators(false, State) -> State; +maybe_notify_decorators(true, State) -> notify_decorators(State), State. + +notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []). -notify_decorators(Event, Props, State = #q{active_consumers = ACs, - backing_queue = BQ, - backing_queue_state = BQS}) -> - P = priority_queue:highest(ACs), - decorator_callback(qname(State), notify, - [Event, [{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)} | - Props]]). +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + P = rabbit_queue_consumers:max_active_priority(Consumers), + decorator_callback(qname(State), consumer_state_changed, + [P, BQ:is_empty(BQS)]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -313,7 +298,7 @@ init_max_length(MaxLen, State) -> State1. terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, fun stop_rate_timer/1, @@ -323,9 +308,10 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), + notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _} <- consumers(State1)], + {Ch, CTag, _, _} <- + rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -408,131 +394,19 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{active_consumers = AC}) -> - true = (priority_queue:is_empty(AC) orelse is_empty(State)). +assert_invariant(State = #q{consumers = Consumers}) -> + true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). -lookup_ch(ChPid) -> - case get({ch, ChPid}) of - undefined -> not_found; - C -> C - end. - -ch_record(ChPid, LimiterPid) -> - Key = {ch, ChPid}, - case get(Key) of - undefined -> MonitorRef = erlang:monitor(process, ChPid), - Limiter = rabbit_limiter:client(LimiterPid), - C = #cr{ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = queue:new(), - consumer_count = 0, - blocked_consumers = priority_queue:new(), - limiter = Limiter, - unsent_message_count = 0}, - put(Key, C), - C; - C = #cr{} -> C - end. - -update_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of - {true, 0, 0} -> ok = erase_ch_record(C); - _ -> ok = store_ch_record(C) - end, - C. - -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C), - ok. - -erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), - ok. - -all_ch_record() -> [C || {{ch, _}, C} <- get()]. - -block_consumer(C = #cr{blocked_consumers = Blocked}, - {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> - update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}), - notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). - -is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). - maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), - [send_drained(C) || C <- all_ch_record()]; + true -> notify_decorators(State), + rabbit_queue_consumers:send_drained(); false -> ok end, State. -send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> - case rabbit_limiter:drained(Limiter) of - {[], Limiter} -> ok; - {CTagCredit, Limiter2} -> rabbit_channel:send_drained( - ChPid, CTagCredit), - update_ch_record(C#cr{limiter = Limiter2}) - end. - -deliver_msgs_to_consumers(_DeliverFun, true, State) -> - {true, State}; -deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers}) -> - case priority_queue:out_p(ActiveConsumers) of - {empty, _} -> - {false, State}; - {{value, QEntry, Priority}, Tail} -> - {Stop, State1} = deliver_msg_to_consumer( - DeliverFun, QEntry, Priority, - State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(DeliverFun, Stop, State1) - end. - -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> - C = lookup_ch(ChPid), - case is_ch_blocked(C) of - true -> block_consumer(C, E, State), - {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required, - Consumer#consumer.tag) of - {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E, State), - {false, State}; - {continue, Limiter} -> - AC1 = priority_queue:in(E, Priority, - State#q.active_consumers), - deliver_msg_to_consumer0( - DeliverFun, Consumer, C#cr{limiter = Limiter}, - State#q{active_consumers = AC1}) - end - end. - -deliver_msg_to_consumer0(DeliverFun, - #consumer{tag = ConsumerTag, - ack_required = AckRequired}, - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, Stop, State1} = - DeliverFun(AckRequired, State), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); - false -> ChAckTags - end, - update_ch_record(C#cr{acktags = ChAckTags1, - unsent_message_count = Count + 1}), - {Stop, State1}. - confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -578,56 +452,68 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - {_Active, State3} = deliver_msgs_to_consumers( - fun(AckRequired, State1) -> - {Result, State2} = fetch(AckRequired, State1), - {Result, is_empty(State2), State2} - end, is_empty(State), State), - State3. +run_message_queue(State) -> run_message_queue(false, State). -add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> - Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of - {_, P} -> P; - _ -> 0 - end, - priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers). +run_message_queue(ActiveConsumersChanged, State) -> + case is_empty(State) of + true -> maybe_notify_decorators(ActiveConsumersChanged, State); + false -> case rabbit_queue_consumers:deliver( + fun(AckRequired) -> fetch(AckRequired, State) end, + qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged1, State1, Consumers} -> + run_message_queue( + ActiveConsumersChanged or ActiveConsumersChanged1, + State1#q{consumers = Consumers}); + {undelivered, ActiveConsumersChanged1, Consumers} -> + maybe_notify_decorators( + ActiveConsumersChanged or ActiveConsumersChanged1, + State#q{consumers = Consumers}) + end + end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case IsDuplicate of - false -> deliver_msgs_to_consumers( - fun (true, State2 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State2#q{backing_queue_state = BQS3}}; - (false, State2) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State2)} - end, false, State1); - true -> {true, State1} + case rabbit_queue_consumers:deliver( + fun (true) -> true = BQ:is_empty(BQS), + {AckTag, BQS1} = BQ:publish_delivered( + Message, Props, SenderPid, BQS), + {{Message, Delivered, AckTag}, + State#q{backing_queue_state = BQS1}}; + (false) -> {{Message, Delivered, undefined}, + discard(Delivery, State)} + end, qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, maybe_notify_decorators( + ActiveConsumersChanged, + State1#q{consumers = Consumers})}; + {undelivered, ActiveConsumersChanged, Consumers} -> + {undelivered, maybe_notify_decorators( + ActiveConsumersChanged, + State#q{consumers = Consumers})} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, - Delivered, State) -> + Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), - case attempt_delivery(Delivery, Props, Delivered, State1) of - {true, State2} -> + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State2 = State1#q{backing_queue_state = BQS1}, + case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, + State2) of + true -> State2; + {delivered, State3} -> + State3; %% The next one is an optimisation - {false, State2 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), - QLen = BQ:len(BQS2), + {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> + discard(Delivery, State3); + {undelivered, State3 = #q{backing_queue_state = BQS2}} -> + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + {Dropped, State4 = #q{backing_queue_state = BQS4}} = + maybe_drop_head(State3#q{backing_queue_state = BQS3}), + QLen = BQ:len(BQS4), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -636,9 +522,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State3; - {true, true, undefined} -> State3; - {_, _, _} -> drop_expired_msgs(State3) + {false, false, _} -> State4; + {true, true, undefined} -> State4; + {_, _, _} -> drop_expired_msgs(State4) end end. @@ -688,63 +574,18 @@ requeue(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) - end, Queue). - -remove_consumers(ChPid, Queue, QName) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag, QName), - false; - (_) -> - true - end, Queue). - -channel_consumers(ChPid, Queue) -> - priority_queue:fold( - fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid -> - [CTag | Acc]; - (_, _, Acc) -> - Acc - end, [], Queue). - -possibly_unblock(State, ChPid, Update) -> - case lookup_ch(ChPid) of - not_found -> State; - C -> C1 = Update(C), - case is_ch_blocked(C) andalso not is_ch_blocked(C1) of - false -> update_ch_record(C1), - State; - true -> unblock(State, C1) - end - end. - -unblock(State, C = #cr{limiter = Limiter}) -> - case lists:partition( - fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> - rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, priority_queue:to_list(C#cr.blocked_consumers)) of - {_, []} -> - update_ch_record(C), - State; - {Blocked, Unblocked} -> - BlockedQ = priority_queue:from_list(Blocked), - UnblockedQ = priority_queue:from_list(Unblocked), - update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - State1 = State#q{active_consumers = AC1}, - [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State1) || - {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State1) +possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> + case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{active_consumers = AC, +handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder, senders = Senders}) -> State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of @@ -752,29 +593,22 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC, true -> credit_flow:peer_down(DownPid), pmon:demonitor(DownPid, Senders) end}, - case lookup_ch(DownPid) of + case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of not_found -> {ok, State1}; - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - blocked_consumers = Blocked} -> - QName = qname(State), - AC1 = remove_consumers(ChPid, AC, QName), - _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission - ok = erase_ch_record(C), + {ChAckTags, ChCTags, Consumers1} -> + QName = qname(State1), + [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags], Holder1 = case Holder of {DownPid, _} -> none; Other -> Other end, - State2 = State1#q{active_consumers = AC1, + State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- channel_consumers(ChPid, AC)], - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- channel_consumers(ChPid, Blocked)], + notify_decorators(State2), case should_auto_delete(State2) of true -> {stop, State2}; - false -> {ok, requeue_and_run(queue:to_list(ChAckTags), + false -> {ok, requeue_and_run(ChAckTags, ensure_expiry_timer(State2))} end end. @@ -789,10 +623,7 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -consumer_count() -> - lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). - -is_unused(_State) -> consumer_count() == 0. +is_unused(_State) -> rabbit_queue_consumers:count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -804,23 +635,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ, State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> - case lookup_ch(ChPid) of - not_found -> - State; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - Fun(State) - end. - -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> - case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of + not_found -> State; + ok -> Fun(State) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -900,117 +717,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, QName = qname(State), {Res, Acks1, BQS1} = Fun(fun (Msg, AckTag, Acks) -> - dead_letter_publish(Msg, Reason, X, RK, QName), + rabbit_dead_letter:publish(Msg, Reason, X, RK, QName), [AckTag | Acks] end, [], BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), {Res, State#q{backing_queue_state = BQS2}}. -dead_letter_publish(Msg, Reason, X, RK, QName) -> - DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), - {Queues, Cycles} = detect_dead_letter_cycles( - Reason, DLMsg, rabbit_exchange:route(X, Delivery)), - lists:foreach(fun log_cycle_once/1, Cycles), - rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), - ok. - stop(State) -> stop(noreply, State). stop(noreply, State) -> {stop, normal, State}; stop(Reply, State) -> {stop, normal, Reply, State}. - -detect_dead_letter_cycles(expired, - #basic_message{content = Content}, Queues) -> - #content{properties = #'P_basic'{headers = Headers}} = - rabbit_binary_parser:ensure_content_decoded(Content), - NoCycles = {Queues, []}, - case Headers of - undefined -> - NoCycles; - _ -> - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, Deaths} -> - {Cycling, NotCycling} = - lists:partition( - fun (#resource{name = Queue}) -> - is_dead_letter_cycle(Queue, Deaths) - end, Queues), - OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || - {table, D} <- Deaths], - OldQueues1 = [QName || {longstr, QName} <- OldQueues], - {NotCycling, [[QName | OldQueues1] || - #resource{name = QName} <- Cycling]}; - _ -> - NoCycles - end - end; -detect_dead_letter_cycles(_Reason, _Msg, Queues) -> - {Queues, []}. - -is_dead_letter_cycle(Queue, Deaths) -> - {Cycle, Rest} = - lists:splitwith( - fun ({table, D}) -> - {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); - (_) -> - true - end, Deaths), - %% Is there a cycle, and if so, is it entirely due to expiry? - case Rest of - [] -> false; - [H|_] -> lists:all( - fun ({table, D}) -> - {longstr, <<"expired">>} =:= - rabbit_misc:table_lookup(D, <<"reason">>); - (_) -> - false - end, Cycle ++ [H]) - end. - -make_dead_letter_msg(Msg = #basic_message{content = Content, - exchange_name = Exchange, - routing_keys = RoutingKeys}, - Reason, DLX, RK, #resource{name = QName}) -> - {DeathRoutingKeys, HeadersFun1} = - case RK of - undefined -> {RoutingKeys, fun (H) -> H end}; - _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} - end, - ReasonBin = list_to_binary(atom_to_list(Reason)), - TimeSec = rabbit_misc:now_ms() div 1000, - PerMsgTTL = per_msg_ttl_header(Content#content.properties), - HeadersFun2 = - fun (Headers) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - RKs1 = [{longstr, Key} || Key <- RKs], - Info = [{<<"reason">>, longstr, ReasonBin}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, TimeSec}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) - end, - Content1 = #content{properties = Props} = - rabbit_basic:map_headers(HeadersFun2, Content), - Content2 = Content1#content{properties = - Props#'P_basic'{expiration = undefined}}, - Msg#basic_message{exchange_name = DLX, - id = rabbit_guid:gen(), - routing_keys = DeathRoutingKeys, - content = Content2}. - -per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> - []; -per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> - [{<<"original-expiration">>, longstr, Expiration}]; -per_msg_ttl_header(_) -> - []. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1041,12 +758,17 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]); + rabbit_queue_consumers:unacknowledged_message_count(); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); i(consumers, _) -> - consumer_count(); + rabbit_queue_consumers:count(); +i(consumer_utilisation, #q{consumers = Consumers}) -> + case rabbit_queue_consumers:count() of + 0 -> ''; + _ -> rabbit_queue_consumers:utilisation(Consumers) + end; i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1064,29 +786,21 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(status, #q{status = Status}) -> - Status; +i(state, #q{status = running}) -> credit_flow:state(); +i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers}) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(ActiveConsumers, []), all_ch_record()). - -consumers(Consumers, Acc) -> - priority_queue:fold( - fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] - end, Acc, Consumers). - emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). + ExtraKs = [K || {K, _} <- Extra], + Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + not lists:member(K, ExtraKs)], + rabbit_event:notify(queue_stats, Extra ++ Infos). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, @@ -1167,8 +881,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, State) -> - reply(consumers(State), State); +handle_call(consumers, _From, State = #q{consumers = Consumers}) -> + reply(rabbit_queue_consumers:all(Consumers), State); handle_call({deliver, Delivery, Delivered}, From, State) -> %% Synchronous, "mandatory" deliver mode. @@ -1196,10 +910,8 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, {{Message, IsDelivered, AckTag}, #q{backing_queue = BQ, backing_queue_state = BQS} = State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = - ch_record(ChPid, LimiterPid), - ChAckTags1 = queue:in(AckTag, ChAckTags), - update_ch_record(C#cr{acktags = ChAckTags1}); + true -> ok = rabbit_queue_consumers:record_ack( + ChPid, LimiterPid, AckTag); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, @@ -1207,78 +919,45 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, - _From, State = #q{active_consumers = AC, + ConsumerTag, ExclusiveConsume, Args, OkMsg}, + _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); - ok -> C = #cr{consumer_count = Count, - limiter = Limiter} = - ch_record(ChPid, LimiterPid), - Limiter1 = case LimiterActive of - true -> rabbit_limiter:activate(Limiter); - false -> Limiter - end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, - Crd, Drain) - end, - C1 = update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter2}), - case is_empty(State) of - true -> send_drained(C1); - false -> ok - end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck, - args = OtherArgs}, - AC1 = add_consumer({ChPid, Consumer}, AC), + ok -> Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + Args, is_empty(State), Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder end, - State1 = State#q{active_consumers = AC1, - has_had_consumers = true, + State1 = State#q{consumers = Consumers1, + has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State1), + not NoAck, qname(State1), Args), + notify_decorators(State1), reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{active_consumers = AC, + State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), - case lookup_ch(ChPid) of + case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); - C = #cr{consumer_count = Count, - limiter = Limiter, - blocked_consumers = Blocked} -> - AC1 = remove_consumer(ChPid, ConsumerTag, AC), - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - Limiter1 = case Count of - 1 -> rabbit_limiter:deactivate(Limiter); - _ -> Limiter - end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), - update_ch_record(C#cr{consumer_count = Count - 1, - limiter = Limiter2, - blocked_consumers = Blocked1}), + Consumers1 -> Holder1 = case Holder of {ChPid, ConsumerTag} -> none; _ -> Holder end, - State1 = State#q{active_consumers = AC1, + State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1288,7 +967,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_expiry_timer(State), - reply({ok, BQ:len(BQS), consumer_count()}, State1); + reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1340,10 +1019,11 @@ handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); handle_call(force_event_refresh, _From, - State = #q{exclusive_consumer = Exclusive}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), QName = qname(State), - AllConsumers = consumers(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( Ch, CTag, false, AckRequired, QName, Args) || @@ -1372,10 +1052,10 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, handle_cast({ack, AckTags, ChPid}, State) -> noreply(ack(AckTags, ChPid, State)); -handle_cast({reject, AckTags, true, ChPid}, State) -> +handle_cast({reject, true, AckTags, ChPid}, State) -> noreply(requeue(AckTags, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State) -> +handle_cast({reject, false, AckTags, ChPid}, State) -> noreply(with_dlx( State#q.dlx, fun (X) -> subtract_acks(ChPid, AckTags, State, @@ -1389,29 +1069,16 @@ handle_cast(delete_immediately, State) -> stop(State); handle_cast({resume, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:resume(Limiter)} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(), + ChPid, State)); handle_cast({notify_sent, ChPid, Credit}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - Credit} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit), + ChPid, State)); handle_cast({activate_limit, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:activate(Limiter)} - end)); - -handle_cast({flush, ChPid}, State) -> - ok = rabbit_channel:flushed(ChPid, self()), - noreply(State); + noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), + ChPid, State)); handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1440,25 +1107,21 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, - State = #q{backing_queue = BQ, + State = #q{consumers = Consumers, + backing_queue = BQ, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - C = #cr{limiter = Limiter} = lookup_ch(ChPid), - C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, - noreply(case Drain andalso Len == 0 of - true -> update_ch_record(C1), - send_drained(C1), - State; - false -> case is_ch_blocked(C1) of - true -> update_ch_record(C1), - State; - false -> unblock(State, C1) - end - end); + noreply( + case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag, + Consumers) of + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) + end); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> @@ -1549,20 +1212,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( State, #q.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}]) end), + fun () -> emit_stats(State, [{idle_since, now()}, + {consumer_utilisation, ''}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). - -log_cycle_once(Queues) -> - Key = {queue_cycle, Queues}, - case get(Key) of - true -> ok; - undefined -> rabbit_log:warning( - "Message dropped. Dead-letter queues cycle detected" ++ - ": ~p~nThis cycle will NOT be reported again.~n", - [Queues]), - put(Key, true) - end. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index ae5bbf51..83f68ed3 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) -> table_field_to_binary({FName, T, V}) -> [short_string_to_binary(FName) | field_value_to_binary(T, V)]. -field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)]; -field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>]; +field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)]; +field_value_to_binary(signedint, V) -> [$I, <<V:32/signed>>]; field_value_to_binary(decimal, V) -> {Before, After} = V, - ["D", Before, <<After:32>>]; -field_value_to_binary(timestamp, V) -> ["T", <<V:64>>]; -field_value_to_binary(table, V) -> ["F", table_to_binary(V)]; -field_value_to_binary(array, V) -> ["A", array_to_binary(V)]; -field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>]; -field_value_to_binary(double, V) -> ["d", <<V:64/float>>]; -field_value_to_binary(float, V) -> ["f", <<V:32/float>>]; -field_value_to_binary(long, V) -> ["l", <<V:64/signed>>]; -field_value_to_binary(short, V) -> ["s", <<V:16/signed>>]; -field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end]; -field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)]; -field_value_to_binary(void, _V) -> ["V"]. + [$D, Before, <<After:32>>]; +field_value_to_binary(timestamp, V) -> [$T, <<V:64>>]; +field_value_to_binary(table, V) -> [$F | table_to_binary(V)]; +field_value_to_binary(array, V) -> [$A | array_to_binary(V)]; +field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>]; +field_value_to_binary(double, V) -> [$d, <<V:64/float>>]; +field_value_to_binary(float, V) -> [$f, <<V:32/float>>]; +field_value_to_binary(long, V) -> [$l, <<V:64/signed>>]; +field_value_to_binary(short, V) -> [$s, <<V:16/signed>>]; +field_value_to_binary(bool, V) -> [$t, if V -> 1; true -> 0 end]; +field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)]; +field_value_to_binary(void, _V) -> [$V]. table_to_binary(Table) when is_list(Table) -> - BinTable = generate_table(Table), - [<<(size(BinTable)):32>>, BinTable]. + BinTable = generate_table_iolist(Table), + [<<(iolist_size(BinTable)):32>> | BinTable]. array_to_binary(Array) when is_list(Array) -> - BinArray = generate_array(Array), - [<<(size(BinArray)):32>>, BinArray]. + BinArray = generate_array_iolist(Array), + [<<(iolist_size(BinArray)):32>> | BinArray]. generate_table(Table) when is_list(Table) -> - list_to_binary(lists:map(fun table_field_to_binary/1, Table)). + list_to_binary(generate_table_iolist(Table)). -generate_array(Array) when is_list(Array) -> - list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, - Array)). +generate_table_iolist(Table) -> + lists:map(fun table_field_to_binary/1, Table). + +generate_array_iolist(Array) -> + lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array). -short_string_to_binary(String) when is_binary(String) -> - Len = size(String), - if Len < 256 -> [<<Len:8>>, String]; - true -> exit(content_properties_shortstr_overflow) - end; short_string_to_binary(String) -> - Len = length(String), + Len = string_length(String), if Len < 256 -> [<<Len:8>>, String]; true -> exit(content_properties_shortstr_overflow) end. -long_string_to_binary(String) when is_binary(String) -> - [<<(size(String)):32>>, String]; long_string_to_binary(String) -> - [<<(length(String)):32>>, String]. + Len = string_length(String), + [<<Len:32>>, String]. + +string_length(String) when is_binary(String) -> size(String); +string_length(String) -> length(String). check_empty_frame_size() -> %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index dc6d090f..f65d8ea7 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -20,6 +20,7 @@ -export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([validate_utf8/1, assert_utf8/1]). %%---------------------------------------------------------------------------- @@ -30,6 +31,8 @@ (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). +-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error'). +-spec(assert_utf8/1 :: (binary()) -> 'ok'). -endif. @@ -50,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) -> {Type, Value, Rest} = parse_field_value(ValueAndRest), [{Type, Value} | parse_array(Rest)]. -parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {longstr, V, R}; -parse_field_value(<<"I", V:32/signed, R/binary>>) -> +parse_field_value(<<$I, V:32/signed, R/binary>>) -> {signedint, V, R}; -parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) -> +parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> {decimal, {Before, After}, R}; -parse_field_value(<<"T", V:64/unsigned, R/binary>>) -> +parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> {timestamp, V, R}; -parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> +parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> {table, parse_table(Table), R}; -parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> +parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> {array, parse_array(Array), R}; -parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R}; -parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R}; +parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; +parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; +parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; +parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; +parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; -parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {binary, V, R}; -parse_field_value(<<"V", R/binary>>) -> +parse_field_value(<<$V, R/binary>>) -> {void, undefined, R}. ensure_content_decoded(Content = #content{properties = Props}) @@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) -> Content; clear_decoded_content(Content = #content{}) -> Content#content{properties = none}. + +assert_utf8(B) -> + case validate_utf8(B) of + ok -> ok; + error -> rabbit_misc:protocol_error( + frame_error, "Malformed UTF-8 in shortstr", []) + end. + +validate_utf8(Bin) -> + try + xmerl_ucs:from_utf8(Bin), + ok + catch exit:{ucs, _} -> + error + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bc9ceac0..09798266 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,8 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, - flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -37,7 +36,7 @@ conn_name, limiter, tx, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, queue_names, queue_monitors, consumer_mapping, - blocking, queue_consumers, delivering_queues, + queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -53,7 +52,7 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked]). + state]). -define(CREATION_EVENT_KEYS, [pid, @@ -98,7 +97,6 @@ -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). --spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -148,9 +146,6 @@ send_credit_reply(Pid, Len) -> send_drained(Pid, CTagCredit) -> gen_server2:cast(Pid, {send_drained, CTagCredit}). -flushed(Pid, QPid) -> - gen_server2:cast(Pid, {flushed, QPid}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). @@ -193,6 +188,7 @@ force_event_refresh() -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), + ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, protocol = Protocol, @@ -211,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), - blocking = sets:new(), queue_consumers = dict:new(), delivering_queues = sets:new(), queue_collector_pid = CollectorPid, @@ -271,7 +266,9 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(Method, Content, State) of + try handle_method(rabbit_channel_interceptor:intercept_method( + expand_shortcuts(Method, State)), + Content, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), noreply(NewState); @@ -287,9 +284,6 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast({flushed, QPid}, State) -> - {noreply, queue_blocked(QPid, State), hibernate}; - handle_cast(ready_for_close, State = #ch{state = closing, writer_pid = WriterPid}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), @@ -364,8 +358,7 @@ handle_info(emit_stats, State) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), - State2 = queue_blocked(QPid, State1), - State3 = handle_consuming_queue_down(QPid, State2), + State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, @@ -476,9 +469,8 @@ check_resource_access(User, Resource, Perm) -> put(permission_cache, [V | CacheTail]) end. -clear_permission_cache() -> - erase(permission_cache), - ok. +clear_permission_cache() -> erase(permission_cache), + ok. check_configure_permitted(Resource, #ch{user = User}) -> check_resource_access(User, Resource, configure). @@ -526,31 +518,44 @@ check_msg_size(Content) -> false -> ok end. +qbin_to_resource(QueueNameBin, State) -> + name_to_resource(queue, QueueNameBin, State). + +name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, Type, NameBin). + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, - most_recently_declared_queue = MRDQ}) -> - rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> - rabbit_misc:r(VHostPath, queue, QueueNameBin). + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_queue_name_shortcut(QueueNameBin, _) -> + QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -expand_binding(queue, DestinationNameBin, RoutingKey, State) -> - {expand_queue_name_shortcut(DestinationNameBin, State), - expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)}; -expand_binding(exchange, DestinationNameBin, RoutingKey, State) -> - {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin), - RoutingKey}. +expand_shortcuts(#'basic.get' {queue = Q} = M, State) -> + M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'basic.consume'{queue = Q} = M, State) -> + M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.delete' {queue = Q} = M, State) -> + M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.purge' {queue = Q} = M, State) -> + M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(M, _State) -> + M. check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> rabbit_misc:protocol_error( @@ -558,6 +563,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> check_not_default_exchange(_) -> ok. +check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>, + kind = exchange}) -> + rabbit_misc:protocol_error( + access_refused, "deletion of system ~s not allowed", + [rabbit_misc:rs(XName)]); +check_exchange_deletion(_) -> + ok. + %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -574,20 +587,6 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. -queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case sets:is_element(QPid, Blocking) of - false -> State; - true -> maybe_send_flow_ok( - State#ch{blocking = sets:del_element(QPid, Blocking)}) - end. - -maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> - case sets:size(Blocking) of - 0 -> ok = send(#'channel.flow_ok'{active = false}, State); - _ -> ok - end, - State. - record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -600,7 +599,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -710,7 +713,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, conn_pid = ConnPid, limiter = Limiter, next_tag = DeliveryTag}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -748,7 +751,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of @@ -763,13 +766,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), - ActualConsumerTag, ExclusiveConsume, - CreditArgs, OtherArgs, + ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -853,8 +854,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, queue:len(UAMQ)), - {reply, #'basic.qos_ok'{}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; + case ((not rabbit_limiter:is_active(Limiter)) andalso + rabbit_limiter:is_active(Limiter1)) of + true -> rabbit_amqqueue:activate_limit_all( + consumer_queues(State#ch.consumer_mapping), self()); + false -> ok + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -937,6 +943,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> @@ -1057,7 +1064,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with( QueueName, @@ -1096,7 +1103,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -1142,36 +1149,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, - _, State = #ch{limiter = Limiter}) -> - Limiter1 = rabbit_limiter:unblock(Limiter), - {reply, #'channel.flow_ok'{active = true}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; - -handle_method(#'channel.flow'{active = false}, - _, State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> - case rabbit_limiter:is_blocked(Limiter) of - true -> {noreply, maybe_send_flow_ok(State)}; - false -> Limiter1 = rabbit_limiter:block(Limiter), - State1 = maybe_limit_queues(Limiter, Limiter1, - State#ch{limiter = Limiter1}), - %% The semantics of channel.flow{active=false} - %% require that no messages are delivered after the - %% channel.flow_ok has been sent. We accomplish that - %% by "flushing" all messages in flight from the - %% consumer queues to us. To do this we tell all the - %% queues to invoke rabbit_channel:flushed/2, which - %% will send us a {flushed, ...} message that appears - %% *after* all the {deliver, ...} messages. We keep - %% track of all the QPids thus asked, and once all of - %% them have responded (or died) we send the - %% channel.flow_ok. - QPids = consumer_queues(Consumers), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, maybe_send_flow_ok( - State1#ch{blocking = sets:from_list(QPids)})} - end; +handle_method(#'channel.flow'{active = true}, _, State) -> + {reply, #'channel.flow_ok'{active = true}, State}; + +handle_method(#'channel.flow'{active = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "active=false", []); handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, @@ -1256,29 +1238,18 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -parse_credit_args(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none - end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; - undefined -> {none, Arguments} - end. - binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> - {DestinationName, ActualRoutingKey} = - expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), + DestinationName = name_to_resource(DestinationType, DestinationNameBin, State), check_write_permitted(DestinationName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, State), case Fun(#binding{source = ExchangeName, destination = DestinationName, - key = ActualRoutingKey, + key = RoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) @@ -1331,7 +1302,7 @@ reject(DeliveryTag, Requeue, Multiple, reject(Requeue, Acked, Limiter) -> foreach_per_queue( fun (QPid, MsgIds) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1434,15 +1405,6 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -maybe_limit_queues(OldLimiter, NewLimiter, State) -> - case ((not rabbit_limiter:is_active(OldLimiter)) andalso - rabbit_limiter:is_active(NewLimiter)) of - true -> Queues = consumer_queues(State#ch.consumer_mapping), - rabbit_amqqueue:activate_limit_all(Queues, self()); - false -> ok - end, - State. - consumer_queues(Consumers) -> lists:usort([QPid || {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). @@ -1454,7 +1416,7 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> %% optimisation: avoid the potentially expensive 'foldl' in the %% common case. - case rabbit_limiter:is_prefetch_limited(Limiter) of + case rabbit_limiter:is_active(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1618,10 +1580,10 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); -i(client_flow_blocked, #ch{limiter = Limiter}) -> - rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). @@ -1642,8 +1604,7 @@ update_measures(Type, Key, Inc, Measure) -> end, put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)). -emit_stats(State) -> - emit_stats(State, []). +emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> Coarse = infos(?STATISTICS_KEYS, State), diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl new file mode 100644 index 00000000..5d1665e0 --- /dev/null +++ b/src/rabbit_channel_interceptor.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% Since the AMQP methods used here are queue related, +%% maybe we want this to be a queue_interceptor. + +-module(rabbit_channel_interceptor). + +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([intercept_method/1]). + +-ifdef(use_specs). + +-type(intercept_method() :: rabbit_framing:amqp_method_name()). +-type(original_method() :: rabbit_framing:amqp_method_record()). +-type(processed_method() :: rabbit_framing:amqp_method_record()). + +-callback description() -> [proplists:property()]. + +-callback intercept(original_method()) -> + rabbit_types:ok_or_error2(processed_method(), any()). + +%% Whether the interceptor wishes to intercept the amqp method +-callback applies_to(intercept_method()) -> boolean(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {intercept, 1}, {applies_to, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +intercept_method(#'basic.publish'{} = M) -> + M; +intercept_method(M) -> + intercept_method(M, select(rabbit_misc:method_record_type(M))). + +intercept_method(M, []) -> + M; +intercept_method(M, [I]) -> + case I:intercept(M) of + {ok, M2} -> + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) + end; + {error, Reason} -> + internal_error("Interceptor: ~p failed with reason: ~p", + [I, Reason]) + end; +intercept_method(M, Is) -> + internal_error("More than one interceptor for method: ~p -- ~p", + [rabbit_misc:method_record_type(M), Is]). + +%% select the interceptors that apply to intercept_method(). +select(Method) -> + [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor), + code:which(M) =/= non_existing, + M:applies_to(Method)]. + +validate_method(M, M2) -> + rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). + +internal_error(Format, Args) -> + rabbit_misc:protocol_error(internal_error, Format, Args).
\ No newline at end of file diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index df2e80ca..26f9700e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,9 +47,9 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, - {tcp, Sock, Channel, FrameMax, - ReaderPid, Protocol}), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = @@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( @@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, init(Type) -> {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. -child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid, true]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; -child_specs(direct) -> - [{limiter, {rabbit_limiter, start_link, []}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} + | child_specs({direct, Identity})]; +child_specs({direct, Identity}) -> + [{limiter, {rabbit_limiter, start_link, [Identity]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index e51615e8..f268d8d6 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([start_channel_sup_sup/1, - start_queue_collector/1]). + start_queue_collector/2]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). --spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) -> {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). -start_queue_collector(SupPid) -> +start_queue_collector(SupPid, Identity) -> supervisor2:start_child( SupPid, - {collector, {rabbit_queue_collector, start_link, []}, + {collector, {rabbit_queue_collector, start_link, [Identity]}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6f36f99d..f3463286 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) -> end. call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +list_to_binary_utf8(L) -> + B = list_to_binary(L), + case rabbit_binary_parser:validate_utf8(B) of + ok -> B; + error -> throw({error, {not_utf_8, L}}) + end. rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl new file mode 100644 index 00000000..640b282e --- /dev/null +++ b/src/rabbit_dead_letter.erl @@ -0,0 +1,141 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_dead_letter). + +-export([publish/5]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(), + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Msg, Reason, X, RK, QName) -> + DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + {Queues, Cycles} = detect_cycles(Reason, DLMsg, + rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery), + ok. + +make_msg(Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + Reason, DLX, RK, #resource{name = QName}) -> + {DeathRoutingKeys, HeadersFun1} = + case RK of + undefined -> {RoutingKeys, fun (H) -> H end}; + _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + ReasonBin = list_to_binary(atom_to_list(Reason)), + TimeSec = rabbit_misc:now_ms() div 1000, + PerMsgTTL = per_msg_ttl_header(Content#content.properties), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = #content{properties = Props} = + rabbit_basic:map_headers(HeadersFun2, Content), + Content2 = Content1#content{properties = + Props#'P_basic'{expiration = undefined}}, + Msg#basic_message{exchange_name = DLX, + id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, + content = Content2}. + +per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> + []; +per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> + [{<<"original-expiration">>, longstr, Expiration}]; +per_msg_ttl_header(_) -> + []. + +detect_cycles(expired, #basic_message{content = Content}, Queues) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + NoCycles = {Queues, []}, + case Headers of + undefined -> + NoCycles; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, Deaths} -> + {Cycling, NotCycling} = + lists:partition(fun (#resource{name = Queue}) -> + is_cycle(Queue, Deaths) + end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; + _ -> + NoCycles + end + end; +detect_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, Deaths), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> lists:all( + fun ({table, D}) -> + {longstr, <<"expired">>} =:= + rabbit_misc:table_lookup(D, <<"reason">>); + (_) -> + false + end, Cycle ++ [H]) + end. + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 17ed8563..ab8c62fe 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -51,7 +51,7 @@ stop() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, false, []), + topic, true, false, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ca67254b..ff9de67a 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,8 @@ -module(rabbit_heartbeat). --export([start/6]). --export([start_heartbeat_sender/3, start_heartbeat_receiver/3, +-export([start/6, start/7]). +-export([start_heartbeat_sender/4, start_heartbeat_receiver/4, pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -39,12 +39,17 @@ non_neg_integer(), heartbeat_callback(), non_neg_integer(), heartbeat_callback()) -> heartbeaters()). --spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). +-spec(start/7 :: + (pid(), rabbit_net:socket(), rabbit_types:proc_name(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). + +-spec(start_heartbeat_sender/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -56,31 +61,35 @@ -endif. %%---------------------------------------------------------------------------- - start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + start(SupPid, Sock, unknown, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun). + +start(SupPid, Sock, Identity, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, - start_heartbeat_sender), + start_heartbeat_sender, Identity), {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), + start_heartbeat_receiver, Identity), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> SendFun(), continue end}). + fun () -> SendFun(), continue end}, Identity). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> ReceiveFun(), stop end}). + fun () -> ReceiveFun(), stop end}, Identity). pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. @@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback, + _Identity) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback, + Identity) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). -heartbeater(Params) -> +heartbeater(Params, Identity) -> Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. + {ok, proc_lib:spawn_link(fun () -> + rabbit_misc:store_proc_name(Identity), + heartbeater(Params, Deb, {0, 0}) + end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d5cfbce6..d37b356c 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,8 +17,7 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) -%% credit mechanism. +%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -65,11 +64,9 @@ %% %% 1. Channels tell the limiter about basic.qos prefetch counts - %% that's what the limit_prefetch/3, unlimit_prefetch/1, -%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are -%% about - and channel.flow blocking - that's what block/1, -%% unblock/1 and is_blocked/1 are for. They also tell the limiter -%% queue state (via the queue) about consumer credit changes - -%% that's what credit/4 is for. +%% get_prefetch_limit/1 API functions are about. They also tell the +%% limiter queue state (via the queue) about consumer credit +%% changes - that's what credit/5 is for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -83,12 +80,11 @@ %% %% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks -%% whether a) the channel isn't blocked by channel.flow, b) the -%% volume has not yet reached the prefetch limit, and c) whether -%% the consumer has enough credit. If so it increments the volume -%% and tells the queue to proceed. Otherwise it marks the queue as -%% requiring notification (see below) and tells the queue not to -%% proceed. +%% whether a) the volume has not yet reached the prefetch limit, +%% and b) whether the consumer has enough credit. If so it +%% increments the volume and tells the queue to proceed. Otherwise +%% it marks the queue as requiring notification (see below) and +%% tells the queue not to proceed. %% %% 6. A queue that has been told to proceed (by the return value of %% can_send/3) sends the message to the channel. Conversely, a @@ -117,16 +113,17 @@ -module(rabbit_limiter). +-include("rabbit.hrl"). + -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/1]). %% channel API --export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, - is_prefetch_limited/1, is_blocked/1, is_active/1, +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -134,27 +131,23 @@ %%---------------------------------------------------------------------------- --record(lstate, {pid, prefetch_limited, blocked}). +-record(lstate, {pid, prefetch_limited}). -record(qstate, {pid, state, credits}). -ifdef(use_specs). -type(lstate() :: #lstate{pid :: pid(), - prefetch_limited :: boolean(), - blocked :: boolean()}). + prefetch_limited :: boolean()}). -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) -> lstate()). -spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). --spec(block/1 :: (lstate()) -> lstate()). --spec(unblock/1 :: (lstate()) -> lstate()). --spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). --spec(is_blocked/1 :: (lstate()) -> boolean()). -spec(is_active/1 :: (lstate()) -> boolean()). -spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). -spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). @@ -168,8 +161,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) - -> qstate()). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), + boolean()) -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -180,7 +173,6 @@ -record(lim, {prefetch_count = 0, ch_pid, - blocked = false, queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -193,12 +185,12 @@ %% API %%---------------------------------------------------------------------------- -start_link() -> gen_server2:start_link(?MODULE, [], []). +start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. ok = gen_server:call(Pid, {new, self()}, infinity), - #lstate{pid = Pid, prefetch_limited = false, blocked = false}. + #lstate{pid = Pid, prefetch_limited = false}. limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> ok = gen_server:call( @@ -210,19 +202,7 @@ unlimit_prefetch(L) -> ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity), L#lstate{prefetch_limited = false}. -block(L) -> - ok = gen_server:call(L#lstate.pid, block, infinity), - L#lstate{blocked = true}. - -unblock(L) -> - ok = gen_server:call(L#lstate.pid, unblock, infinity), - L#lstate{blocked = false}. - -is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. - -is_blocked(#lstate{blocked = Blocked}) -> Blocked. - -is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). +is_active(#lstate{prefetch_limited = Limited}) -> Limited. get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; get_prefetch_limit(L) -> @@ -276,8 +256,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> - Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> + {Res, Cr} = case IsEmpty andalso Drain of + true -> {true, make_credit(0, false)}; + false -> {false, make_credit(Credit, Drain)} + end, + {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = @@ -303,6 +287,10 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... +make_credit(Credit, Drain) -> + %% Using up all credit implies no need to send a 'drained' event + #credit{credit = Credit, drain = Drain andalso Credit > 0}. + decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of {value, #credit{credit = Credit, drain = Drain}} -> @@ -312,15 +300,14 @@ decrement_credit(CTag, Credits) -> end. update_credit(CTag, Credit, Drain, Credits) -> - %% Using up all credit implies no need to send a 'drained' event - Drain1 = Drain andalso Credit > 0, - gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + gb_trees:update(CTag, make_credit(Credit, Drain), Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> {ok, #lim{}}. +init([ProcName]) -> ?store_proc_name(ProcName), + {ok, #lim{}}. prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; prioritise_call(_Msg, _From, _Len, _State) -> 0. @@ -339,19 +326,10 @@ handle_call(unlimit_prefetch, _From, State) -> {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, volume = 0})}; -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call(unblock, _From, State) -> - {reply, ok, maybe_notify(State, State#lim{blocked = false})}; - handle_call(get_prefetch_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({can_send, QPid, _AckRequired}, _From, - State = #lim{blocked = true}) -> - {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case prefetch_limit_reached(State) of @@ -387,8 +365,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso - not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + case prefetch_limit_reached(OldState) andalso + not prefetch_limit_reached(NewState) of true -> notify_queues(NewState); false -> NewState end. @@ -396,8 +374,6 @@ maybe_notify(OldState, NewState) -> prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -blocked(#lim{blocked = Blocked}) -> Blocked. - remember_queue(QPid, State = #lim{queues = Queues}) -> case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index a0e8bcc6..6661408c 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -323,6 +323,7 @@ ensure_monitoring(CPid, Pids) -> %% --------------------------------------------------------------------------- init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> + ?store_proc_name(QueueName), GM1 = case GM of undefined -> {ok, GM2} = gm:start_link( diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 58400f39..9ce5afcb 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), Ref = make_ref(), - Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 96f89ecc..d562210a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -79,6 +79,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). init(Q) -> + ?store_proc_name(Q#amqqueue.name), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -616,6 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), + rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName), rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 61e90105..e3fae4c0 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/7, slave/7]). +-export([master_prepare/4, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -61,7 +61,8 @@ -type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), bqs()}). --spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), + log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), @@ -80,9 +81,12 @@ %% --------------------------------------------------------------------------- %% Master -master_prepare(Ref, Log, SPids) -> +master_prepare(Ref, QName, Log, SPids) -> MPid = self(), - spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + spawn_link(fun () -> + ?store_proc_name(QName), + syncer(Ref, Log, MPid, SPids) + end). master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 00c4eaf3..80e160d9 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -70,6 +70,7 @@ -export([interval_operation/4]). -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). +-export([store_proc_name/1, store_proc_name/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -248,6 +249,8 @@ -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). +-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). +-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -endif. %%---------------------------------------------------------------------------- @@ -1082,6 +1085,9 @@ stop_timer(State, Idx) -> end end. +store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). +store_proc_name(TypeProcName) -> put(process_name, TypeProcName). + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index e8c96818..401b8ab1 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); maybe_ntoab(Host) -> Host. rdns(Addr) -> - {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups), - case Lookup of - true -> list_to_binary(rabbit_networking:tcp_host(Addr)); - _ -> Addr + case application:get_env(rabbit, reverse_dns_lookups) of + {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr)); + _ -> Addr end. sock_funs(inbound) -> {fun peername/1, fun sockname/1}; diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 10e68198..488f1df5 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -201,7 +201,7 @@ init([]) -> %% writing out the cluster status files - bad things can then %% happen. process_flag(trap_exit, true), - net_kernel:monitor_nodes(true), + net_kernel:monitor_nodes(true, [nodedown_reason]), {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), @@ -267,7 +267,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; -handle_info({nodedown, Node}, State) -> +handle_info({nodedown, Node, Info}, State) -> + rabbit_log:info("node ~p down: ~p~n", + [Node, proplists:get_value(nodedown_reason, Info)]), ok = handle_dead_node(Node), {noreply, State}; diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b54fdd2e..5a1613a7 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,9 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2]). + is_running/2, is_process_running/2, fqdn_nodename/0]). + +-include_lib("kernel/include/inet.hrl"). -define(EPMD_TIMEOUT, 30000). @@ -35,6 +37,7 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). +-spec(fqdn_nodename/0 :: () -> binary()). -endif. @@ -107,3 +110,9 @@ is_process_running(Node, Process) -> undefined -> false; P when is_pid(P) -> true end. + +fqdn_nodename() -> + {ID, _} = rabbit_nodes:parts(node()), + {ok, Host} = inet:gethostname(), + {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), + list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 6406f7e9..855c7995 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1]). +-export([start_link/1, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,7 +31,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). @@ -39,8 +40,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link(ProcName) -> + gen_server:start_link(?MODULE, [ProcName], []). register(CollectorPid, Q) -> gen_server:call(CollectorPid, {register, Q}, infinity). @@ -50,7 +51,8 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- -init([]) -> +init([ProcName]) -> + ?store_proc_name(ProcName), {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl new file mode 100644 index 00000000..bea7e0d0 --- /dev/null +++ b/src/rabbit_queue_consumers.erl @@ -0,0 +1,440 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_queue_consumers). + +-export([new/0, max_active_priority/1, inactive/1, all/1, count/0, + unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + possibly_unblock/3, + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, + credit/6, utilisation/1]). + +%%---------------------------------------------------------------------------- + +-define(UNSENT_MESSAGE_LIMIT, 200). + +-record(state, {consumers, use}). + +-record(consumer, {tag, ack_required, args}). + +%% These are held in our process dictionary +-record(cr, {ch_pid, + monitor_ref, + acktags, + consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason + blocked_consumers, + %% The limiter itself + limiter, + %% Internal flow control for queue -> writer + unsent_message_count}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type time_micros() :: non_neg_integer(). +-type ratio() :: float(). +-type state() :: #state{consumers ::priority_queue:q(), + use :: {'inactive', + time_micros(), time_micros(), ratio()} | + {'active', time_micros(), ratio()}}. +-type ch() :: pid(). +-type ack() :: non_neg_integer(). +-type cr_fun() :: fun ((#cr{}) -> #cr{}). +-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. + +-spec new() -> state(). +-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. +-spec inactive(state()) -> boolean(). +-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]. +-spec count() -> non_neg_integer(). +-spec unacknowledged_message_count() -> non_neg_integer(). +-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), + rabbit_framing:amqp_table(), boolean(), state()) -> state(). +-spec remove(ch(), rabbit_types:ctag(), state()) -> + 'not_found' | state(). +-spec erase_ch(ch(), state()) -> + 'not_found' | {[ack()], [rabbit_types:ctag()], + state()}. +-spec send_drained() -> 'ok'. +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state()) -> + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. +-spec record_ack(ch(), pid(), ack()) -> 'ok'. +-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec possibly_unblock(cr_fun(), ch(), state()) -> + 'unchanged' | {'unblocked', state()}. +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), + state()) -> 'unchanged' | {'unblocked', state()}. +-spec utilisation(state()) -> ratio(). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> #state{consumers = priority_queue:new(), + use = {inactive, now_micros(), 0, 0.0}}. + +max_active_priority(#state{consumers = Consumers}) -> + priority_queue:highest(Consumers). + +inactive(#state{consumers = Consumers}) -> + priority_queue:is_empty(Consumers). + +all(#state{consumers = Consumers}) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(Consumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> + priority_queue:fold( + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] + end, Acc, Consumers). + +count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + +unacknowledged_message_count() -> + lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, + State = #state{consumers = Consumers}) -> + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, + update_ch_record(case parse_credit_args(Args) of + none -> C1; + {Crd, Drain} -> credit_and_drain( + C1, CTag, Crd, Drain, IsEmpty) + end), + Consumer = #consumer{tag = CTag, + ack_required = not NoAck, + args = Args}, + State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. + +remove(ChPid, CTag, State = #state{consumers = Consumers}) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> + Blocked1 = remove_consumer(ChPid, CTag, Blocked), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), + State#state{consumers = + remove_consumer(ChPid, CTag, Consumers)} + end. + +erase_ch(ChPid, State = #state{consumers = Consumers}) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = BlockedQ} -> + AllConsumers = priority_queue:join(Consumers, BlockedQ), + ok = erase_ch_record(C), + {queue:to_list(ChAckTags), + tags(priority_queue:to_list(AllConsumers)), + State#state{consumers = remove_consumers(ChPid, Consumers)}} + end. + +send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], + ok. + +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). + +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> + case priority_queue:out_p(Consumers) of + {empty, _} -> + {undelivered, ConsumersChanged, + State#state{use = update_use(State#state.use, inactive)}}; + {{value, QEntry, Priority}, Tail} -> + case deliver_to_consumer(FetchFun, QEntry, QName) of + {delivered, R} -> + {delivered, ConsumersChanged, R, + State#state{consumers = priority_queue:in(QEntry, Priority, + Tail)}}; + undelivered -> + deliver(FetchFun, QName, true, + State#state{consumers = Tail}) + end + end. + +deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> + C = lookup_ch(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + undelivered; + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + undelivered; + {continue, Limiter} -> + {delivered, deliver_to_consumer( + FetchFun, Consumer, + C#cr{limiter = Limiter}, QName)} + end + end. + +deliver_to_consumer(FetchFun, + #consumer{tag = CTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + QName) -> + {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), + rabbit_channel:deliver(ChPid, CTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> queue:in(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + R. + +record_ack(ChPid, LimiterPid, AckTag) -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), + update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + ok. + +subtract_acks(ChPid, AckTags) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), + ok + end. + +subtract_acks([], [], AckQ) -> + AckQ; +subtract_acks([], Prefix, AckQ) -> + queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); +subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> + case queue:out(AckQ) of + {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + end. + +possibly_unblock(Update, ChPid, State) -> + case lookup_ch(ChPid) of + not_found -> unchanged; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + unchanged; + true -> unblock(C1, State) + end + end. + +unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, + State = #state{consumers = Consumers, use = Use}) -> + case lists:partition( + fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, priority_queue:to_list(BlockedQ)) of + {_, []} -> + update_ch_record(C), + unchanged; + {Blocked, Unblocked} -> + BlockedQ1 = priority_queue:from_list(Blocked), + UnblockedQ = priority_queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ1}), + {unblocked, + State#state{consumers = priority_queue:join(Consumers, UnblockedQ), + use = update_use(Use, active)}} + end. + +resume_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end. + +notify_sent_fun(Credit) -> + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - Credit} + end. + +activate_limit_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end. + +credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> + case lookup_ch(ChPid) of + not_found -> + unchanged; + #cr{limiter = Limiter} = C -> + C1 = #cr{limiter = Limiter1} = + credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + case is_ch_blocked(C1) orelse + (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse + rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of + true -> update_ch_record(C1), + unchanged; + false -> unblock(C1, State) + end + end. + +utilisation(#state{use = {active, Since, Avg}}) -> + use_avg(now_micros() - Since, 0, Avg); +utilisation(#state{use = {inactive, Since, Active, Avg}}) -> + use_avg(Active, now_micros() - Since, Avg). + +%%---------------------------------------------------------------------------- + +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + +lookup_ch(ChPid) -> + case get({ch, ChPid}) of + undefined -> not_found; + C -> C + end. + +ch_record(ChPid, LimiterPid) -> + Key = {ch, ChPid}, + case get(Key) of + undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), + C = #cr{ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = queue:new(), + consumer_count = 0, + blocked_consumers = priority_queue:new(), + limiter = Limiter, + unsent_message_count = 0}, + put(Key, C), + C; + C = #cr{} -> C + end. + +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> + case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + {true, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) + end, + C. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. + +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + +all_ch_record() -> [C || {{ch, _}, C} <- get()]. + +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}). + +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> C; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + C#cr{limiter = Limiter2} + end. + +credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, + CTag, Credit, Drain, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + {true, Limiter1} -> rabbit_channel:send_drained(ChPid, + [{CTag, Credit}]), + C#cr{limiter = Limiter1}; + {false, Limiter1} -> C#cr{limiter = Limiter1} + end. + +tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList]. + +add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> + Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, P} -> P; + _ -> 0 + end, + priority_queue:in({ChPid, Consumer}, Priority, Queue). + +remove_consumer(ChPid, CTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= CTag) + end, Queue). + +remove_consumers(ChPid, Queue) -> + priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; + (_) -> true + end, Queue). + +update_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, use_avg(Active, Now - Since, Avg)}. + +use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + +now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..6205e2dc 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,9 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +%% called with Queue, MaxActivePriority, IsEmpty +-callback consumer_state_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. -else. @@ -32,7 +27,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {consumer_state_changed, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1ae9bacf..64debcab 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2, recvloop/2]). +-export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -32,15 +32,16 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(CHANNEL_MIN, 1). %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). + stats_timer, channel_sup_sup_pid, channel_count, throttle}). -record(connection, {name, host, peer_host, port, peer_port, - protocol, user, timeout_sec, frame_max, vhost, + protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). @@ -48,15 +49,14 @@ blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, last_blocked_by, last_blocked_age, - channels]). + send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, client_properties]). + timeout, frame_max, channel_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -91,9 +91,10 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). --spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). --spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> + any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. @@ -113,8 +114,8 @@ init(Parent, HelperSup) -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - mainloop(Deb, State#v1{parent = Parent}). +system_continue(Parent, Deb, {Buf, BufLen, State}) -> + mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -213,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), + ?store_proc_name(list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -238,8 +240,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - buf = [], - buf_len = 0, + channel_count = 0, throttle = #throttle{ alarmed_by = [], last_blocked_by = none, @@ -247,9 +248,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> blocked_sent = false}}, try run({?MODULE, recvloop, - [Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)]}), + [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -276,31 +277,41 @@ run({M, F, A}) -> catch {become, MFA} -> run(MFA) end. -recvloop(Deb, State = #v1{pending_recv = true}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = blocked}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) +recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, Buf, BufLen, State)}); +recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> case rabbit_net:setopts(Sock, [{active, once}]) of - ok -> mainloop(Deb, State#v1{pending_recv = true}); + ok -> mainloop(Deb, Buf, BufLen, + State#v1{pending_recv = true}); {error, Reason} -> stop(Reason, State) end; -recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> - {Data, Rest} = split_binary(case Buf of - [B] -> B; - _ -> list_to_binary(lists:reverse(Buf)) - end, RecvLen), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = [Rest], - buf_len = BufLen - RecvLen})). - -mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> +recvloop(Deb, [B], _BufLen, State) -> + {Rest, State1} = handle_input(State#v1.callback, B, State), + recvloop(Deb, [Rest], size(Rest), State1); +recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> + {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), + Data = list_to_binary(lists:reverse(DataLRev)), + {<<>>, State1} = handle_input(State#v1.callback, Data, State), + recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). + +binlist_split(0, L, Acc) -> + {L, Acc}; +binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> + {H, T} = split_binary(Acc0, -Len), + {[H|L], [T|Acc]}; +binlist_split(Len, [H|T], Acc) -> + binlist_split(Len - size(H), T, [H|Acc]). + +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + recvloop(Deb, [Data | Buf], BufLen + size(Data), + State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -309,11 +320,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, - ?MODULE, Deb, State); + ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; - NewState -> recvloop(Deb, NewState) + NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. @@ -333,8 +344,8 @@ handle_other({conserve_resources, Source, Conserve}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - maybe_close(control_throttle(State)); + {_, State1} = channel_cleanup(ChPid, State), + maybe_close(control_throttle(State1)); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -491,63 +502,59 @@ close_connection(State = #v1{queue_collector = Collector, State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, controlled} -> State; + {Channel, State1} = channel_cleanup(ChPid, State), + case {Channel, termination_kind(Reason)} of + {undefined, controlled} -> State1; {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> State1 = handle_exception( - State, Channel, Reason), - maybe_close(control_throttle(State1)) + {_, controlled} -> maybe_close(control_throttle(State1)); + {_, uncontrolled} -> State2 = handle_exception( + State1, Channel, Reason), + maybe_close(control_throttle(State2)) end. -terminate_channels() -> - NChannels = - length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), - if NChannels > 0 -> - Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, - TimerRef = erlang:send_after(Timeout, self(), cancel_wait), - wait_for_channel_termination(NChannels, TimerRef); - true -> ok - end. +terminate_channels(#v1{channel_count = 0} = State) -> + State; +terminate_channels(#v1{channel_count = ChannelCount} = State) -> + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), + Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount, + TimerRef = erlang:send_after(Timeout, self(), cancel_wait), + wait_for_channel_termination(ChannelCount, TimerRef, State). -wait_for_channel_termination(0, TimerRef) -> +wait_for_channel_termination(0, TimerRef, State) -> case erlang:cancel_timer(TimerRef) of false -> receive - cancel_wait -> ok + cancel_wait -> State end; - _ -> ok + _ -> State end; - -wait_for_channel_termination(N, TimerRef) -> +wait_for_channel_termination(N, TimerRef, State) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, _} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - wait_for_channel_termination(N-1, TimerRef); - {Channel, uncontrolled} -> - log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination(N-1, TimerRef) + {Channel, State1} = channel_cleanup(ChPid, State), + case {Channel, termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_, controlled} -> wait_for_channel_termination( + N-1, TimerRef, State1); + {_, uncontrolled} -> log(error, + "AMQP connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), + wait_for_channel_termination( + N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) end. maybe_close(State = #v1{connection_state = closing, - connection = #connection{protocol = Protocol}, - sock = Sock}) -> - case all_channels() of - [] -> - NewState = close_connection(State), - ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), - NewState; - _ -> State - end; + channel_count = 0, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> + NewState = close_connection(State), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + NewState; maybe_close(State) -> State. @@ -566,8 +573,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol}, [self(), CS, Channel, Reason]), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), + State1 = close_connection(terminate_channels(State)), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), State1; handle_exception(State, Channel, Reason) -> @@ -605,15 +611,26 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> %%-------------------------------------------------------------------------- -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{name = Name, - protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, +create_channel(_Channel, + #v1{channel_count = ChannelCount, + connection = #connection{channel_max = ChannelMax}}) + when ChannelMax /= 0 andalso ChannelCount >= ChannelMax -> + {error, rabbit_misc:amqp_error( + not_allowed, "number of channels opened (~w) has reached the " + "negotiated channel_max (~w)", + [ChannelCount, ChannelMax], 'none')}; +create_channel(Channel, + #v1{sock = Sock, + queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + channel_count = ChannelCount, + connection = + #connection{name = Name, + protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State) -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, @@ -621,16 +638,16 @@ create_channel(Channel, State) -> MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), - {ChPid, AState}. + {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}. -channel_cleanup(ChPid) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> case get({ch_pid, ChPid}) of - undefined -> undefined; + undefined -> {undefined, State}; {Channel, MRef} -> credit_flow:peer_down(ChPid), erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), - Channel + {Channel, State#v1{channel_count = ChannelCount - 1}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -669,32 +686,36 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + case (case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> {ok, Other, State} + end) of + {error, Error} -> + handle_exception(State, Channel, Error); + {ok, {ChPid, AState}, State1} -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State1)); + {error, Reason} -> + handle_exception(State1, Channel, Reason) + end end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> - channel_cleanup(ChPid), + {_, State1} = channel_cleanup(ChPid, State), %% This is not strictly necessary, but more obviously %% correct. Also note that we do not need to call maybe_close/1 %% since we cannot possibly be in the 'closing' state. - control_throttle(State); + control_throttle(State1); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> @@ -708,32 +729,36 @@ post_process_frame(_Frame, _ChPid, State) -> %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - ensure_stats_timer( - switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1)); - +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, + Payload:PayloadSize/binary, ?FRAME_END, + Rest/binary>>, + State) -> + {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(State, + {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> - <<Payload:PayloadSize/binary, EndMarker>> = Data, + <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), - switch_callback(State1, frame_header, 7); + {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; - -handle_input(handshake, <<"AMQP", A, B, C, D>>, State) -> - handshake({A, B, C, D}, State); -handle_input(handshake, Other, #v1{sock = Sock}) -> +handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> + {Rest, handshake({A, B, C, D}, State)}; +handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); - handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). @@ -848,38 +873,33 @@ handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); -handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat}, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, + channel_max = ChannelMax, + heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, sock = Sock}) -> - ServerFrameMax = server_frame_max(), - if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w < ~w min size", - [FrameMax, ?FRAME_MIN_SIZE]); - ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ServerFrameMax]); - true -> - {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - queue_collector = Collector, - heartbeater = Heartbeater} - end; + ok = validate_negotiated_integer_value( + frame_max, ?FRAME_MIN_SIZE, FrameMax), + ok = validate_negotiated_integer_value( + channel_max, ?CHANNEL_MIN, ChannelMax), + {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector( + SupPid, Connection#connection.name), + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = rabbit_heartbeat:start( + SupPid, Sock, Connection#connection.name, + ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + frame_max = FrameMax, + channel_max = ChannelMax, + timeout_sec = ClientHeartbeat}, + queue_collector = Collector, + heartbeater = Heartbeater}; handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, @@ -927,13 +947,28 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -server_frame_max() -> - {ok, FrameMax} = application:get_env(rabbit, frame_max), - FrameMax. +validate_negotiated_integer_value(Field, Min, ClientValue) -> + ServerValue = get_env(Field), + if ClientValue /= 0 andalso ClientValue < Min -> + fail_negotiation(Field, min, ServerValue, ClientValue); + ServerValue /= 0 andalso ClientValue > ServerValue -> + fail_negotiation(Field, max, ServerValue, ClientValue); + true -> + ok + end. -server_heartbeat() -> - {ok, Heartbeat} = application:get_env(rabbit, heartbeat), - Heartbeat. +fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> + {S1, S2} = case MinOrMax of + min -> {lower, minimum}; + max -> {higher, maximum} + end, + rabbit_misc:protocol_error( + not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", + [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). + +get_env(Key) -> + {ok, Value} = application:get_env(rabbit, Key), + Value. send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -999,9 +1034,9 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = 0, - frame_max = server_frame_max(), - heartbeat = server_heartbeat()}, + Tune = #'connection.tune'{frame_max = get_env(frame_max), + channel_max = get_env(channel_max), + heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User, @@ -1028,13 +1063,15 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); -i(state, #v1{connection_state = CS}) -> CS; -i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> - infinity; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> - timer:now_diff(erlang:now(), T) / 1000000; -i(channels, #v1{}) -> length(all_channels()); +i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; +i(state, #v1{connection_state = ConnectionState, + throttle = #throttle{last_blocked_by = BlockedBy, + last_blocked_at = T}}) -> + Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000, + case {BlockedBy, Recent} of + {flow, true} -> flow; + {_, _} -> ConnectionState + end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; @@ -1049,6 +1086,7 @@ ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; @@ -1089,10 +1127,8 @@ emit_stats(State) -> %% If we emit an event which looks like we are in flow control, it's not a %% good idea for it to be our last even if we go idle. Keep emitting %% events, either we stay busy or we drop out of flow control. - %% The 5 is to match the test in formatters.js:fmt_connection_state(). - %% This magic number will go away when bug 24829 is merged. - case proplists:get_value(last_blocked_age, Infos) < 5 of - true -> ensure_stats_timer(State1); + case proplists:get_value(state, Infos) of + flow -> ensure_stats_timer(State1); _ -> State1 end. @@ -1110,15 +1146,16 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - throw({become, {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(State)]}}) + F = fun (_Deb, Buf, BufLen, S) -> + {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(Buf, BufLen, S)]} + end, + State = #v1{connection_state = {become, F}} end. -pack_for_1_0(#v1{parent = Parent, - sock = Sock, - recv_len = RecvLen, - pending_recv = PendingRecv, - helper_sup = SupPid, - buf = Buf, - buf_len = BufLen}) -> +pack_for_1_0(Buf, BufLen, #v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3014aeb7..abb71e7a 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(queue_decorator) -> rabbit_queue_decorator; -class_module(policy_validator) -> rabbit_policy_validator; -class_module(ha_mode) -> rabbit_mirror_queue_mode. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode; +class_module(channel_interceptor) -> rabbit_channel_interceptor. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5fe319d3..731351f0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1172,7 +1172,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1262,7 +1262,7 @@ test_writer(Pid) -> test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, Limiter), @@ -2815,7 +2815,7 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> @@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a36613db..0edebff1 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -30,7 +30,8 @@ connection/0, protocol/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, - channel_exit/0, connection_exit/0, mfargs/0]). + channel_exit/0, connection_exit/0, mfargs/0, proc_name/0, + proc_type_and_name/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +157,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). +-type(proc_name() :: term()). +-type(proc_type_and_name() :: {atom(), proc_name()}). + -endif. % use_specs diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 6f95ef60..90372461 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -46,6 +46,7 @@ -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). +-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). %% ------------------------------------------------------------------- @@ -74,6 +75,7 @@ -spec(exchange_decorators/0 :: () -> 'ok'). -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). +-spec(internal_system_x/0 :: () -> 'ok'). -endif. @@ -340,6 +342,19 @@ queue_decorators(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, policy, gm_pids, decorators]). +internal_system_x() -> + transform( + rabbit_durable_exchange, + fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>}, + Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches, + Policy, Decorators}; + (X) -> + X + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d013d43..047bce77 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -60,15 +60,17 @@ add(VHostPath) -> (ok, false) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}, - {<<"amq.rabbitmq.trace">>, topic}]], + Type, true, false, Internal, []) || + {Name, Type, Internal} <- + [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), rabbit_event:notify(vhost_created, info(VHostPath)), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 34dd3d3b..3571692b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, start/6, start_link/6]). +-export([start/6, start_link/6, start/7, start_link/7]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -30,7 +30,7 @@ -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/2, mainloop1/2]). +-export([enter_mainloop/2, mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -41,21 +41,25 @@ -ifdef(use_specs). --spec(start/5 :: +-spec(start/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start_link/5 :: +-spec(start_link/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start/6 :: +-spec(start/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). --spec(start_link/6 :: +-spec(start_link/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). @@ -99,23 +103,23 @@ %%--------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +enter_mainloop(Identity, State) -> + Deb = sys:debug_options([]), + ?store_proc_name(Identity), + mainloop(Deb, State). + mainloop(Deb, State) -> try mainloop1(Deb, State) |