diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-13 10:54:43 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-13 10:54:43 +0000 |
commit | 20a47dd600054b1ff0d60cefc81f0f743765eb4b (patch) | |
tree | 23a915a8144f74404e7e2c3c474517e927e65a7f | |
parent | 15ec271e89915308fdaa88ab94481a4f31a00c7c (diff) | |
parent | e212cea92aaece5e7c49019d25be2abfd6d8db47 (diff) | |
download | rabbitmq-server-20a47dd600054b1ff0d60cefc81f0f743765eb4b.tar.gz |
merge default into bug25827
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/gm.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 127 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 55 | ||||
-rw-r--r-- | src/rabbit_channel_interceptor.erl | 91 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_connection_helper_sup.erl | 9 | ||||
-rw-r--r-- | src/rabbit_dead_letter.erl | 141 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 55 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 22 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_net.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 12 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 39 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 11 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 15 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 49 |
23 files changed, 453 insertions, 249 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index bd7a0eed..afb6e576 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -111,3 +111,5 @@ -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). + +-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)). @@ -542,6 +542,7 @@ forget_group(GroupName) -> ok. init([GroupName, Module, Args, TxnFun]) -> + put(process_name, {?MODULE, GroupName}), {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), @@ -898,13 +899,10 @@ internal_broadcast(Msg, From, State = #state { self = Self, State1 = State #state { pub_count = PubCount1, confirms = Confirms1, broadcast_buffer = Buffer1 }, - case From =/= none of - true -> - handle_callback_result({Result, flush_broadcast_buffer(State1)}); - false -> - handle_callback_result( - {Result, State1 #state { broadcast_buffer = Buffer1 }}) - end. + handle_callback_result({Result, case From of + none -> State1; + _ -> flush_broadcast_buffer(State1) + end}). flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c3de2c08..5c098755 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -107,6 +107,7 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), + ?store_proc_name(Q#amqqueue.name), {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -722,117 +723,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, QName = qname(State), {Res, Acks1, BQS1} = Fun(fun (Msg, AckTag, Acks) -> - dead_letter_publish(Msg, Reason, X, RK, QName), + rabbit_dead_letter:publish(Msg, Reason, X, RK, QName), [AckTag | Acks] end, [], BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), {Res, State#q{backing_queue_state = BQS2}}. -dead_letter_publish(Msg, Reason, X, RK, QName) -> - DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), - {Queues, Cycles} = detect_dead_letter_cycles( - Reason, DLMsg, rabbit_exchange:route(X, Delivery)), - lists:foreach(fun log_cycle_once/1, Cycles), - rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), - ok. - stop(State) -> stop(noreply, State). stop(noreply, State) -> {stop, normal, State}; stop(Reply, State) -> {stop, normal, Reply, State}. - -detect_dead_letter_cycles(expired, - #basic_message{content = Content}, Queues) -> - #content{properties = #'P_basic'{headers = Headers}} = - rabbit_binary_parser:ensure_content_decoded(Content), - NoCycles = {Queues, []}, - case Headers of - undefined -> - NoCycles; - _ -> - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, Deaths} -> - {Cycling, NotCycling} = - lists:partition( - fun (#resource{name = Queue}) -> - is_dead_letter_cycle(Queue, Deaths) - end, Queues), - OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || - {table, D} <- Deaths], - OldQueues1 = [QName || {longstr, QName} <- OldQueues], - {NotCycling, [[QName | OldQueues1] || - #resource{name = QName} <- Cycling]}; - _ -> - NoCycles - end - end; -detect_dead_letter_cycles(_Reason, _Msg, Queues) -> - {Queues, []}. - -is_dead_letter_cycle(Queue, Deaths) -> - {Cycle, Rest} = - lists:splitwith( - fun ({table, D}) -> - {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); - (_) -> - true - end, Deaths), - %% Is there a cycle, and if so, is it entirely due to expiry? - case Rest of - [] -> false; - [H|_] -> lists:all( - fun ({table, D}) -> - {longstr, <<"expired">>} =:= - rabbit_misc:table_lookup(D, <<"reason">>); - (_) -> - false - end, Cycle ++ [H]) - end. - -make_dead_letter_msg(Msg = #basic_message{content = Content, - exchange_name = Exchange, - routing_keys = RoutingKeys}, - Reason, DLX, RK, #resource{name = QName}) -> - {DeathRoutingKeys, HeadersFun1} = - case RK of - undefined -> {RoutingKeys, fun (H) -> H end}; - _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} - end, - ReasonBin = list_to_binary(atom_to_list(Reason)), - TimeSec = rabbit_misc:now_ms() div 1000, - PerMsgTTL = per_msg_ttl_header(Content#content.properties), - HeadersFun2 = - fun (Headers) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - RKs1 = [{longstr, Key} || Key <- RKs], - Info = [{<<"reason">>, longstr, ReasonBin}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, TimeSec}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) - end, - Content1 = #content{properties = Props} = - rabbit_basic:map_headers(HeadersFun2, Content), - Content2 = Content1#content{properties = - Props#'P_basic'{expiration = undefined}}, - Msg#basic_message{exchange_name = DLX, - id = rabbit_guid:gen(), - routing_keys = DeathRoutingKeys, - content = Content2}. - -per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> - []; -per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> - [{<<"original-expiration">>, longstr, Expiration}]; -per_msg_ttl_header(_) -> - []. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1217,13 +1118,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, - State = #q{backing_queue = BQ, + State = #q{consumers = Consumers, + backing_queue = BQ, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - noreply(possibly_unblock(rabbit_queue_consumers:credit_fun( - Len == 0, Credit, Drain, CTag), - ChPid, State)); + noreply( + case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag, + Consumers) of + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) + end); handle_cast(notify_decorators, State) -> notify_decorators(State), @@ -1324,14 +1230,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). - -log_cycle_once(Queues) -> - Key = {queue_cycle, Queues}, - case get(Key) of - true -> ok; - undefined -> rabbit_log:warning( - "Message dropped. Dead-letter queues cycle detected" ++ - ": ~p~nThis cycle will NOT be reported again.~n", - [Queues]), - put(Key, true) - end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eded8a90..469cf4f7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,6 +194,7 @@ force_event_refresh() -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), + ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, protocol = Protocol, @@ -272,7 +273,9 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(Method, Content, State) of + try handle_method(rabbit_channel_interceptor:intercept_method( + expand_shortcuts(Method, State)), + Content, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), noreply(NewState); @@ -519,14 +522,19 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +qbin_to_resource(QueueNameBin, State) -> + name_to_resource(queue, QueueNameBin, State). + +name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, Type, NameBin). + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, - most_recently_declared_queue = MRDQ}) -> - rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> - rabbit_misc:r(VHostPath, queue, QueueNameBin). +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_queue_name_shortcut(QueueNameBin, _) -> + QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> @@ -538,12 +546,22 @@ expand_routing_key_shortcut(<<>>, <<>>, expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -expand_binding(queue, DestinationNameBin, RoutingKey, State) -> - {expand_queue_name_shortcut(DestinationNameBin, State), - expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)}; -expand_binding(exchange, DestinationNameBin, RoutingKey, State) -> - {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin), - RoutingKey}. +expand_shortcuts(#'basic.get' {queue = Q} = M, State) -> + M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'basic.consume'{queue = Q} = M, State) -> + M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.delete' {queue = Q} = M, State) -> + M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.purge' {queue = Q} = M, State) -> + M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(M, _State) -> + M. check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> rabbit_misc:protocol_error( @@ -714,7 +732,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, conn_pid = ConnPid, limiter = Limiter, next_tag = DeliveryTag}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -752,7 +770,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of @@ -1062,7 +1080,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with( QueueName, @@ -1101,7 +1119,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = qbin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -1275,15 +1293,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> - {DestinationName, ActualRoutingKey} = - expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), + DestinationName = name_to_resource(DestinationType, DestinationNameBin, State), check_write_permitted(DestinationName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, State), case Fun(#binding{source = ExchangeName, destination = DestinationName, - key = ActualRoutingKey, + key = RoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl new file mode 100644 index 00000000..5d1665e0 --- /dev/null +++ b/src/rabbit_channel_interceptor.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% Since the AMQP methods used here are queue related, +%% maybe we want this to be a queue_interceptor. + +-module(rabbit_channel_interceptor). + +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([intercept_method/1]). + +-ifdef(use_specs). + +-type(intercept_method() :: rabbit_framing:amqp_method_name()). +-type(original_method() :: rabbit_framing:amqp_method_record()). +-type(processed_method() :: rabbit_framing:amqp_method_record()). + +-callback description() -> [proplists:property()]. + +-callback intercept(original_method()) -> + rabbit_types:ok_or_error2(processed_method(), any()). + +%% Whether the interceptor wishes to intercept the amqp method +-callback applies_to(intercept_method()) -> boolean(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {intercept, 1}, {applies_to, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +intercept_method(#'basic.publish'{} = M) -> + M; +intercept_method(M) -> + intercept_method(M, select(rabbit_misc:method_record_type(M))). + +intercept_method(M, []) -> + M; +intercept_method(M, [I]) -> + case I:intercept(M) of + {ok, M2} -> + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) + end; + {error, Reason} -> + internal_error("Interceptor: ~p failed with reason: ~p", + [I, Reason]) + end; +intercept_method(M, Is) -> + internal_error("More than one interceptor for method: ~p -- ~p", + [rabbit_misc:method_record_type(M), Is]). + +%% select the interceptors that apply to intercept_method(). +select(Method) -> + [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor), + code:which(M) =/= non_existing, + M:applies_to(Method)]. + +validate_method(M, M2) -> + rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). + +internal_error(Format, Args) -> + rabbit_misc:protocol_error(internal_error, Format, Args).
\ No newline at end of file diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index df2e80ca..26f9700e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,9 +47,9 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, - {tcp, Sock, Channel, FrameMax, - ReaderPid, Protocol}), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = @@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + {ok, SupPid} = supervisor2:start_link( + ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( @@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, init(Type) -> {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. -child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid, true]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; -child_specs(direct) -> - [{limiter, {rabbit_limiter, start_link, []}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} + | child_specs({direct, Identity})]; +child_specs({direct, Identity}) -> + [{limiter, {rabbit_limiter, start_link, [Identity]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index e51615e8..f268d8d6 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([start_channel_sup_sup/1, - start_queue_collector/1]). + start_queue_collector/2]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). --spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) -> {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). -start_queue_collector(SupPid) -> +start_queue_collector(SupPid, Identity) -> supervisor2:start_child( SupPid, - {collector, {rabbit_queue_collector, start_link, []}, + {collector, {rabbit_queue_collector, start_link, [Identity]}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl new file mode 100644 index 00000000..640b282e --- /dev/null +++ b/src/rabbit_dead_letter.erl @@ -0,0 +1,141 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_dead_letter). + +-export([publish/5]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(), + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Msg, Reason, X, RK, QName) -> + DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + {Queues, Cycles} = detect_cycles(Reason, DLMsg, + rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery), + ok. + +make_msg(Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + Reason, DLX, RK, #resource{name = QName}) -> + {DeathRoutingKeys, HeadersFun1} = + case RK of + undefined -> {RoutingKeys, fun (H) -> H end}; + _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + ReasonBin = list_to_binary(atom_to_list(Reason)), + TimeSec = rabbit_misc:now_ms() div 1000, + PerMsgTTL = per_msg_ttl_header(Content#content.properties), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = #content{properties = Props} = + rabbit_basic:map_headers(HeadersFun2, Content), + Content2 = Content1#content{properties = + Props#'P_basic'{expiration = undefined}}, + Msg#basic_message{exchange_name = DLX, + id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, + content = Content2}. + +per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> + []; +per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> + [{<<"original-expiration">>, longstr, Expiration}]; +per_msg_ttl_header(_) -> + []. + +detect_cycles(expired, #basic_message{content = Content}, Queues) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + NoCycles = {Queues, []}, + case Headers of + undefined -> + NoCycles; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, Deaths} -> + {Cycling, NotCycling} = + lists:partition(fun (#resource{name = Queue}) -> + is_cycle(Queue, Deaths) + end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; + _ -> + NoCycles + end + end; +detect_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, Deaths), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> lists:all( + fun ({table, D}) -> + {longstr, <<"expired">>} =:= + rabbit_misc:table_lookup(D, <<"reason">>); + (_) -> + false + end, Cycle ++ [H]) + end. + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ca67254b..ff9de67a 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,8 @@ -module(rabbit_heartbeat). --export([start/6]). --export([start_heartbeat_sender/3, start_heartbeat_receiver/3, +-export([start/6, start/7]). +-export([start_heartbeat_sender/4, start_heartbeat_receiver/4, pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -39,12 +39,17 @@ non_neg_integer(), heartbeat_callback(), non_neg_integer(), heartbeat_callback()) -> heartbeaters()). --spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> - rabbit_types:ok(pid())). +-spec(start/7 :: + (pid(), rabbit_net:socket(), rabbit_types:proc_name(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). + +-spec(start_heartbeat_sender/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/4 :: + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -56,31 +61,35 @@ -endif. %%---------------------------------------------------------------------------- - start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + start(SupPid, Sock, unknown, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun). + +start(SupPid, Sock, Identity, + SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, - start_heartbeat_sender), + start_heartbeat_sender, Identity), {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), + start_heartbeat_receiver, Identity), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> SendFun(), continue end}). + fun () -> SendFun(), continue end}, Identity). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> ReceiveFun(), stop end}). + fun () -> ReceiveFun(), stop end}, Identity). pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. @@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback, + _Identity) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback, + Identity) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). -heartbeater(Params) -> +heartbeater(Params, Identity) -> Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. + {ok, proc_lib:spawn_link(fun () -> + rabbit_misc:store_proc_name(Identity), + heartbeater(Params, Deb, {0, 0}) + end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2857ca55..d26b27fb 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -117,16 +117,18 @@ -module(rabbit_limiter). +-include("rabbit.hrl"). + -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/1]). %% channel API -export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, is_prefetch_limited/1, is_blocked/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -145,7 +147,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) @@ -168,8 +171,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), - boolean()) -> qstate()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -193,7 +196,7 @@ %% API %%---------------------------------------------------------------------------- -start_link() -> gen_server2:start_link(?MODULE, [], []). +start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. @@ -276,9 +279,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> - Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)}; -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. drained(Limiter = #qstate{credits = Credits}) -> @@ -322,7 +323,8 @@ update_credit(CTag, Credit, Drain, Credits) -> %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> {ok, #lim{}}. +init([ProcName]) -> ?store_proc_name(ProcName), + {ok, #lim{}}. prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; prioritise_call(_Msg, _From, _Len, _State) -> 0. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index a0e8bcc6..6661408c 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -323,6 +323,7 @@ ensure_monitoring(CPid, Pids) -> %% --------------------------------------------------------------------------- init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> + ?store_proc_name(QueueName), GM1 = case GM of undefined -> {ok, GM2} = gm:start_link( diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d9cef642..4f50e1a5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), Ref = make_ref(), - Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b578d1a6..cb2e272f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -79,6 +79,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). init(Q) -> + ?store_proc_name(Q#amqqueue.name), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -616,6 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), + rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName), rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 61e90105..e3fae4c0 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/7, slave/7]). +-export([master_prepare/4, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -61,7 +61,8 @@ -type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), bqs()}). --spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), + log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), @@ -80,9 +81,12 @@ %% --------------------------------------------------------------------------- %% Master -master_prepare(Ref, Log, SPids) -> +master_prepare(Ref, QName, Log, SPids) -> MPid = self(), - spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + spawn_link(fun () -> + ?store_proc_name(QName), + syncer(Ref, Log, MPid, SPids) + end). master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 00c4eaf3..80e160d9 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -70,6 +70,7 @@ -export([interval_operation/4]). -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). +-export([store_proc_name/1, store_proc_name/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -248,6 +249,8 @@ -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). +-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). +-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -endif. %%---------------------------------------------------------------------------- @@ -1082,6 +1085,9 @@ stop_timer(State, Idx) -> end end. +store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). +store_proc_name(TypeProcName) -> put(process_name, TypeProcName). + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index e8c96818..401b8ab1 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); maybe_ntoab(Host) -> Host. rdns(Addr) -> - {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups), - case Lookup of - true -> list_to_binary(rabbit_networking:tcp_host(Addr)); - _ -> Addr + case application:get_env(rabbit, reverse_dns_lookups) of + {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr)); + _ -> Addr end. sock_funs(inbound) -> {fun peername/1, fun sockname/1}; diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 6406f7e9..855c7995 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1]). +-export([start_link/1, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,7 +31,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_link/1 :: (rabbit_types:proc_name()) -> + rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). @@ -39,8 +40,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link(ProcName) -> + gen_server:start_link(?MODULE, [ProcName], []). register(CollectorPid, Q) -> gen_server:call(CollectorPid, {register, Q}, infinity). @@ -50,7 +51,8 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- -init([]) -> +init([ProcName]) -> + ?store_proc_name(ProcName), {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f06423f7..0a823366 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -20,8 +20,8 @@ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, - utilisation/1]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, + credit/6, utilisation/1]). %%---------------------------------------------------------------------------- @@ -84,11 +84,11 @@ -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. --spec resume_fun() -> cr_fun(). --spec notify_sent_fun(non_neg_integer()) -> cr_fun(). --spec activate_limit_fun() -> cr_fun(). --spec credit_fun(boolean(), non_neg_integer(), boolean(), - rabbit_types:ctag()) -> cr_fun(). +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), + state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). -endif. @@ -131,7 +131,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, Limiter2 = case CreditArgs of none -> Limiter1; {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, IsEmpty, Drain) + Limiter1, ConsumerTag, Crd, Drain) end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter2}, @@ -306,13 +306,24 @@ activate_limit_fun() -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. -credit_fun(IsEmpty, Credit, Drain, CTag) -> - fun (C = #cr{limiter = Limiter}) -> +credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> + case lookup_ch(ChPid) of + not_found -> + unchanged; + #cr{limiter = Limiter} = C -> C1 = C#cr{limiter = rabbit_limiter:credit( - Limiter, CTag, Credit, IsEmpty, Drain)}, - case Drain andalso IsEmpty of - true -> send_drained(C1); - false -> C1 + Limiter, CTag, Credit, Drain)}, + C2 = #cr{limiter = Limiter1} = + case Drain andalso IsEmpty of + true -> send_drained(C1); + false -> C1 + end, + case is_ch_blocked(C2) orelse + (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse + rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of + true -> update_ch_record(C2), + unchanged; + false -> unblock(C2, State) end end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d9879f1b..8553e36d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -214,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), + ?store_proc_name(list_to_binary(Name)), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -877,15 +878,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, frame_max, ?FRAME_MIN_SIZE, FrameMax), ok = validate_negotiated_integer_value( channel_max, ?CHANNEL_MIN, ChannelMax), - {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), + {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector( + SupPid, Connection#connection.name), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), + Heartbeater = rabbit_heartbeat:start( + SupPid, Sock, Connection#connection.name, + ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ frame_max = FrameMax, diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3014aeb7..abb71e7a 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(queue_decorator) -> rabbit_queue_decorator; -class_module(policy_validator) -> rabbit_policy_validator; -class_module(ha_mode) -> rabbit_mirror_queue_mode. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode; +class_module(channel_interceptor) -> rabbit_channel_interceptor. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bb923440..07925dbf 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1262,7 +1262,7 @@ test_writer(Pid) -> test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, Limiter), @@ -2816,7 +2816,7 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> @@ -2843,7 +2843,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), - {ok, Limiter} = rabbit_limiter:start_link(), + {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a36613db..0edebff1 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -30,7 +30,8 @@ connection/0, protocol/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, - channel_exit/0, connection_exit/0, mfargs/0]). + channel_exit/0, connection_exit/0, mfargs/0, proc_name/0, + proc_type_and_name/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +157,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). +-type(proc_name() :: term()). +-type(proc_type_and_name() :: {atom(), proc_name()}). + -endif. % use_specs diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 34dd3d3b..3571692b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, start/6, start_link/6]). +-export([start/6, start_link/6, start/7, start_link/7]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -30,7 +30,7 @@ -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/2, mainloop1/2]). +-export([enter_mainloop/2, mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -41,21 +41,25 @@ -ifdef(use_specs). --spec(start/5 :: +-spec(start/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start_link/5 :: +-spec(start_link/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name()) -> rabbit_types:ok(pid())). --spec(start/6 :: +-spec(start/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). --spec(start_link/6 :: +-spec(start_link/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid())). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). @@ -99,23 +103,23 @@ %%--------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false). +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false). -start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - Deb = sys:debug_options([]), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. + {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +enter_mainloop(Identity, State) -> + Deb = sys:debug_options([]), + ?store_proc_name(Identity), + mainloop(Deb, State). + mainloop(Deb, State) -> try mainloop1(Deb, State) |