summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-13 10:54:43 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-13 10:54:43 +0000
commit20a47dd600054b1ff0d60cefc81f0f743765eb4b (patch)
tree23a915a8144f74404e7e2c3c474517e927e65a7f
parent15ec271e89915308fdaa88ab94481a4f31a00c7c (diff)
parente212cea92aaece5e7c49019d25be2abfd6d8db47 (diff)
downloadrabbitmq-server-20a47dd600054b1ff0d60cefc81f0f743765eb4b.tar.gz
merge default into bug25827
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/gm.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl127
-rw-r--r--src/rabbit_channel.erl55
-rw-r--r--src/rabbit_channel_interceptor.erl91
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_connection_helper_sup.erl9
-rw-r--r--src/rabbit_dead_letter.erl141
-rw-r--r--src/rabbit_heartbeat.erl55
-rw-r--r--src/rabbit_limiter.erl22
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl1
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_net.erl7
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_queue_consumers.erl39
-rw-r--r--src/rabbit_reader.erl11
-rw-r--r--src/rabbit_registry.erl15
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl49
23 files changed, 453 insertions, 249 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index bd7a0eed..afb6e576 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -111,3 +111,5 @@
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
+
+-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)).
diff --git a/src/gm.erl b/src/gm.erl
index 098d84fa..df1c258d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -542,6 +542,7 @@ forget_group(GroupName) ->
ok.
init([GroupName, Module, Args, TxnFun]) ->
+ put(process_name, {?MODULE, GroupName}),
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -898,13 +899,10 @@ internal_broadcast(Msg, From, State = #state { self = Self,
State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
- case From =/= none of
- true ->
- handle_callback_result({Result, flush_broadcast_buffer(State1)});
- false ->
- handle_callback_result(
- {Result, State1 #state { broadcast_buffer = Buffer1 }})
- end.
+ handle_callback_result({Result, case From of
+ none -> State1;
+ _ -> flush_broadcast_buffer(State1)
+ end}).
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c3de2c08..5c098755 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -107,6 +107,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
+ ?store_proc_name(Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -722,117 +723,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- dead_letter_publish(Msg, Reason, X, RK, QName),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
{Res, State#q{backing_queue_state = BQS2}}.
-dead_letter_publish(Msg, Reason, X, RK, QName) ->
- DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
- {Queues, Cycles} = detect_dead_letter_cycles(
- Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
- ok.
-
stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-
-detect_dead_letter_cycles(expired,
- #basic_message{content = Content}, Queues) ->
- #content{properties = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- NoCycles = {Queues, []},
- case Headers of
- undefined ->
- NoCycles;
- _ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, Deaths} ->
- {Cycling, NotCycling} =
- lists:partition(
- fun (#resource{name = Queue}) ->
- is_dead_letter_cycle(Queue, Deaths)
- end, Queues),
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- Deaths],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- {NotCycling, [[QName | OldQueues1] ||
- #resource{name = QName} <- Cycling]};
- _ ->
- NoCycles
- end
- end;
-detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
- {Queues, []}.
-
-is_dead_letter_cycle(Queue, Deaths) ->
- {Cycle, Rest} =
- lists:splitwith(
- fun ({table, D}) ->
- {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
- (_) ->
- true
- end, Deaths),
- %% Is there a cycle, and if so, is it entirely due to expiry?
- case Rest of
- [] -> false;
- [H|_] -> lists:all(
- fun ({table, D}) ->
- {longstr, <<"expired">>} =:=
- rabbit_misc:table_lookup(D, <<"reason">>);
- (_) ->
- false
- end, Cycle ++ [H])
- end.
-
-make_dead_letter_msg(Msg = #basic_message{content = Content,
- exchange_name = Exchange,
- routing_keys = RoutingKeys},
- Reason, DLX, RK, #resource{name = QName}) ->
- {DeathRoutingKeys, HeadersFun1} =
- case RK of
- undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
- end,
- ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = rabbit_misc:now_ms() div 1000,
- PerMsgTTL = per_msg_ttl_header(Content#content.properties),
- HeadersFun2 =
- fun (Headers) ->
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- RKs1 = [{longstr, Key} || Key <- RKs],
- Info = [{<<"reason">>, longstr, ReasonBin},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, TimeSec},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
- end,
- Content1 = #content{properties = Props} =
- rabbit_basic:map_headers(HeadersFun2, Content),
- Content2 = Content1#content{properties =
- Props#'P_basic'{expiration = undefined}},
- Msg#basic_message{exchange_name = DLX,
- id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys,
- content = Content2}.
-
-per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
- [];
-per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
- [{<<"original-expiration">>, longstr, Expiration}];
-per_msg_ttl_header(_) ->
- [].
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1217,13 +1118,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
- State = #q{backing_queue = BQ,
+ State = #q{consumers = Consumers,
+ backing_queue = BQ,
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- noreply(possibly_unblock(rabbit_queue_consumers:credit_fun(
- Len == 0, Credit, Drain, CTag),
- ChPid, State));
+ noreply(
+ case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag,
+ Consumers) of
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
+ end);
handle_cast(notify_decorators, State) ->
notify_decorators(State),
@@ -1324,14 +1230,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-
-log_cycle_once(Queues) ->
- Key = {queue_cycle, Queues},
- case get(Key) of
- true -> ok;
- undefined -> rabbit_log:warning(
- "Message dropped. Dead-letter queues cycle detected" ++
- ": ~p~nThis cycle will NOT be reported again.~n",
- [Queues]),
- put(Key, true)
- end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eded8a90..469cf4f7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -194,6 +194,7 @@ force_event_refresh() ->
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
+ ?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
protocol = Protocol,
@@ -272,7 +273,9 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(Method, Content, State) of
+ try handle_method(rabbit_channel_interceptor:intercept_method(
+ expand_shortcuts(Method, State)),
+ Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -519,14 +522,19 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+qbin_to_resource(QueueNameBin, State) ->
+ name_to_resource(queue, QueueNameBin, State).
+
+name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, Type, NameBin).
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ}) ->
- rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
- rabbit_misc:r(VHostPath, queue, QueueNameBin).
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_queue_name_shortcut(QueueNameBin, _) ->
+ QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
@@ -538,12 +546,22 @@ expand_routing_key_shortcut(<<>>, <<>>,
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
-expand_binding(queue, DestinationNameBin, RoutingKey, State) ->
- {expand_queue_name_shortcut(DestinationNameBin, State),
- expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)};
-expand_binding(exchange, DestinationNameBin, RoutingKey, State) ->
- {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin),
- RoutingKey}.
+expand_shortcuts(#'basic.get' {queue = Q} = M, State) ->
+ M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'basic.consume'{queue = Q} = M, State) ->
+ M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.delete' {queue = Q} = M, State) ->
+ M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.purge' {queue = Q} = M, State) ->
+ M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(M, _State) ->
+ M.
check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
rabbit_misc:protocol_error(
@@ -714,7 +732,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
conn_pid = ConnPid,
limiter = Limiter,
next_tag = DeliveryTag}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -752,7 +770,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
@@ -1062,7 +1080,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with(
QueueName,
@@ -1101,7 +1119,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -1275,15 +1293,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- {DestinationName, ActualRoutingKey} =
- expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
+ DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, State),
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
- key = ActualRoutingKey,
+ key = RoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
new file mode 100644
index 00000000..5d1665e0
--- /dev/null
+++ b/src/rabbit_channel_interceptor.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% Since the AMQP methods used here are queue related,
+%% maybe we want this to be a queue_interceptor.
+
+-module(rabbit_channel_interceptor).
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([intercept_method/1]).
+
+-ifdef(use_specs).
+
+-type(intercept_method() :: rabbit_framing:amqp_method_name()).
+-type(original_method() :: rabbit_framing:amqp_method_record()).
+-type(processed_method() :: rabbit_framing:amqp_method_record()).
+
+-callback description() -> [proplists:property()].
+
+-callback intercept(original_method()) ->
+ rabbit_types:ok_or_error2(processed_method(), any()).
+
+%% Whether the interceptor wishes to intercept the amqp method
+-callback applies_to(intercept_method()) -> boolean().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {intercept, 1}, {applies_to, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+intercept_method(#'basic.publish'{} = M) ->
+ M;
+intercept_method(M) ->
+ intercept_method(M, select(rabbit_misc:method_record_type(M))).
+
+intercept_method(M, []) ->
+ M;
+intercept_method(M, [I]) ->
+ case I:intercept(M) of
+ {ok, M2} ->
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
+ end;
+ {error, Reason} ->
+ internal_error("Interceptor: ~p failed with reason: ~p",
+ [I, Reason])
+ end;
+intercept_method(M, Is) ->
+ internal_error("More than one interceptor for method: ~p -- ~p",
+ [rabbit_misc:method_record_type(M), Is]).
+
+%% select the interceptors that apply to intercept_method().
+select(Method) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor),
+ code:which(M) =/= non_existing,
+ M:applies_to(Method)].
+
+validate_method(M, M2) ->
+ rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+
+internal_error(Format, Args) ->
+ rabbit_misc:protocol_error(internal_error, Format, Args). \ No newline at end of file
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index df2e80ca..26f9700e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,9 +47,9 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE,
- {tcp, Sock, Channel, FrameMax,
- ReaderPid, Protocol}),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
[WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
@@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
init(Type) ->
{ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
-child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
-child_specs(direct) ->
- [{limiter, {rabbit_limiter, start_link, []},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index e51615e8..f268d8d6 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -20,7 +20,7 @@
-export([start_link/0]).
-export([start_channel_sup_sup/1,
- start_queue_collector/1]).
+ start_queue_collector/2]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) ->
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
-start_queue_collector(SupPid) ->
+start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
+ {collector, {rabbit_queue_collector, start_link, [Identity]},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
new file mode 100644
index 00000000..640b282e
--- /dev/null
+++ b/src/rabbit_dead_letter.erl
@@ -0,0 +1,141 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_dead_letter).
+
+-export([publish/5]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(),
+ 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Msg, Reason, X, RK, QName) ->
+ DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
+ {Queues, Cycles} = detect_cycles(Reason, DLMsg,
+ rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
+
+make_msg(Msg = #basic_message{content = Content,
+ exchange_name = Exchange,
+ routing_keys = RoutingKeys},
+ Reason, DLX, RK, #resource{name = QName}) ->
+ {DeathRoutingKeys, HeadersFun1} =
+ case RK of
+ undefined -> {RoutingKeys, fun (H) -> H end};
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
+ TimeSec = rabbit_misc:now_ms() div 1000,
+ PerMsgTTL = per_msg_ttl_header(Content#content.properties),
+ HeadersFun2 =
+ fun (Headers) ->
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
+ end,
+ Content1 = #content{properties = Props} =
+ rabbit_basic:map_headers(HeadersFun2, Content),
+ Content2 = Content1#content{properties =
+ Props#'P_basic'{expiration = undefined}},
+ Msg#basic_message{exchange_name = DLX,
+ id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys,
+ content = Content2}.
+
+per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
+ [];
+per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
+ [{<<"original-expiration">>, longstr, Expiration}];
+per_msg_ttl_header(_) ->
+ [].
+
+detect_cycles(expired, #basic_message{content = Content}, Queues) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ NoCycles = {Queues, []},
+ case Headers of
+ undefined ->
+ NoCycles;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, Deaths} ->
+ {Cycling, NotCycling} =
+ lists:partition(fun (#resource{name = Queue}) ->
+ is_cycle(Queue, Deaths)
+ end, Queues),
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- Deaths],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
+ _ ->
+ NoCycles
+ end
+ end;
+detect_cycles(_Reason, _Msg, Queues) ->
+ {Queues, []}.
+
+is_cycle(Queue, Deaths) ->
+ {Cycle, Rest} =
+ lists:splitwith(
+ fun ({table, D}) ->
+ {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
+ (_) ->
+ true
+ end, Deaths),
+ %% Is there a cycle, and if so, is it entirely due to expiry?
+ case Rest of
+ [] -> false;
+ [H|_] -> lists:all(
+ fun ({table, D}) ->
+ {longstr, <<"expired">>} =:=
+ rabbit_misc:table_lookup(D, <<"reason">>);
+ (_) ->
+ false
+ end, Cycle ++ [H])
+ end.
+
+log_cycle_once(Queues) ->
+ Key = {queue_cycle, Queues},
+ case get(Key) of
+ true -> ok;
+ undefined -> rabbit_log:warning(
+ "Message dropped. Dead-letter queues cycle detected" ++
+ ": ~p~nThis cycle will NOT be reported again.~n",
+ [Queues]),
+ put(Key, true)
+ end.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ca67254b..ff9de67a 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,8 @@
-module(rabbit_heartbeat).
--export([start/6]).
--export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+-export([start/6, start/7]).
+-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -39,12 +39,17 @@
non_neg_integer(), heartbeat_callback(),
non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
--spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
+-spec(start/7 ::
+ (pid(), rabbit_net:socket(), rabbit_types:proc_name(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
+-spec(start_heartbeat_sender/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -56,31 +61,35 @@
-endif.
%%----------------------------------------------------------------------------
-
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ start(SupPid, Sock, unknown,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
+
+start(SupPid, Sock, Identity,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
- start_heartbeat_sender),
+ start_heartbeat_sender, Identity),
{ok, Receiver} =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
+ start_heartbeat_receiver, Identity),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () -> SendFun(), continue end}).
+ fun () -> SendFun(), continue end}, Identity).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () -> ReceiveFun(), stop end}).
+ fun () -> ReceiveFun(), stop end}, Identity).
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
@@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
+ _Identity) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
+ Identity) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
-heartbeater(Params) ->
+heartbeater(Params, Identity) ->
Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
+ {ok, proc_lib:spawn_link(fun () ->
+ rabbit_misc:store_proc_name(Identity),
+ heartbeater(Params, Deb, {0, 0})
+ end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Deb, {StatVal, SameCount} = State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2857ca55..d26b27fb 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -117,16 +117,18 @@
-module(rabbit_limiter).
+-include("rabbit.hrl").
+
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/1]).
%% channel API
-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
is_prefetch_limited/1, is_blocked/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
+ is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -145,7 +147,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
@@ -168,8 +171,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
- boolean()) -> qstate()).
+-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
+ -> qstate()).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -193,7 +196,7 @@
%% API
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link(?MODULE, [], []).
+start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
@@ -276,9 +279,7 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
- Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)};
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
drained(Limiter = #qstate{credits = Credits}) ->
@@ -322,7 +323,8 @@ update_credit(CTag, Credit, Drain, Credits) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, #lim{}}.
+init([ProcName]) -> ?store_proc_name(ProcName),
+ {ok, #lim{}}.
prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
prioritise_call(_Msg, _From, _Len, _State) -> 0.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index a0e8bcc6..6661408c 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -323,6 +323,7 @@ ensure_monitoring(CPid, Pids) ->
%% ---------------------------------------------------------------------------
init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
+ ?store_proc_name(QueueName),
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d9cef642..4f50e1a5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
Ref = make_ref(),
- Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b578d1a6..cb2e272f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -79,6 +79,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
init(Q) ->
+ ?store_proc_name(Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
@@ -616,6 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
+ rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName),
rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 61e90105..e3fae4c0 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -61,7 +61,8 @@
-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
bqs()}).
--spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
+ log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
@@ -80,9 +81,12 @@
%% ---------------------------------------------------------------------------
%% Master
-master_prepare(Ref, Log, SPids) ->
+master_prepare(Ref, QName, Log, SPids) ->
MPid = self(),
- spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+ spawn_link(fun () ->
+ ?store_proc_name(QName),
+ syncer(Ref, Log, MPid, SPids)
+ end).
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 00c4eaf3..80e160d9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([store_proc_name/1, store_proc_name/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
+-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-endif.
%%----------------------------------------------------------------------------
@@ -1082,6 +1085,9 @@ stop_timer(State, Idx) ->
end
end.
+store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
+store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index e8c96818..401b8ab1 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
maybe_ntoab(Host) -> Host.
rdns(Addr) ->
- {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
- case Lookup of
- true -> list_to_binary(rabbit_networking:tcp_host(Addr));
- _ -> Addr
+ case application:get_env(rabbit, reverse_dns_lookups) of
+ {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
end.
sock_funs(inbound) -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6406f7e9..855c7995 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1]).
+-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
@@ -39,8 +40,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(ProcName) ->
+ gen_server:start_link(?MODULE, [ProcName], []).
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
@@ -50,7 +51,8 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
-init([]) ->
+init([ProcName]) ->
+ ?store_proc_name(ProcName),
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f06423f7..0a823366 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -20,8 +20,8 @@
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
- utilisation/1]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
+ credit/6, utilisation/1]).
%%----------------------------------------------------------------------------
@@ -84,11 +84,11 @@
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
--spec resume_fun() -> cr_fun().
--spec notify_sent_fun(non_neg_integer()) -> cr_fun().
--spec activate_limit_fun() -> cr_fun().
--spec credit_fun(boolean(), non_neg_integer(), boolean(),
- rabbit_types:ctag()) -> cr_fun().
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
-endif.
@@ -131,7 +131,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
Limiter2 = case CreditArgs of
none -> Limiter1;
{Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, IsEmpty, Drain)
+ Limiter1, ConsumerTag, Crd, Drain)
end,
C1 = C#cr{consumer_count = Count + 1,
limiter = Limiter2},
@@ -306,13 +306,24 @@ activate_limit_fun() ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
-credit_fun(IsEmpty, Credit, Drain, CTag) ->
- fun (C = #cr{limiter = Limiter}) ->
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ unchanged;
+ #cr{limiter = Limiter} = C ->
C1 = C#cr{limiter = rabbit_limiter:credit(
- Limiter, CTag, Credit, IsEmpty, Drain)},
- case Drain andalso IsEmpty of
- true -> send_drained(C1);
- false -> C1
+ Limiter, CTag, Credit, Drain)},
+ C2 = #cr{limiter = Limiter1} =
+ case Drain andalso IsEmpty of
+ true -> send_drained(C1);
+ false -> C1
+ end,
+ case is_ch_blocked(C2) orelse
+ (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
+ true -> update_ch_record(C2),
+ unchanged;
+ false -> unblock(C2, State)
end
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d9879f1b..8553e36d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -214,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ ?store_proc_name(list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -877,15 +878,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
frame_max, ?FRAME_MIN_SIZE, FrameMax),
ok = validate_negotiated_integer_value(
channel_max, ?CHANNEL_MIN, ChannelMax),
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
frame_max = FrameMax,
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3014aeb7..abb71e7a 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(queue_decorator) -> rabbit_queue_decorator;
-class_module(policy_validator) -> rabbit_policy_validator;
-class_module(ha_mode) -> rabbit_mirror_queue_mode.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
+class_module(policy_validator) -> rabbit_policy_validator;
+class_module(ha_mode) -> rabbit_mirror_queue_mode;
+class_module(channel_interceptor) -> rabbit_channel_interceptor.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb923440..07925dbf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1262,7 +1262,7 @@ test_writer(Pid) ->
test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me, Limiter),
@@ -2816,7 +2816,7 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2843,7 +2843,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..0edebff1 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -30,7 +30,8 @@
connection/0, protocol/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
- channel_exit/0, connection_exit/0, mfargs/0]).
+ channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
+ proc_type_and_name/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +157,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
+-type(proc_name() :: term()).
+-type(proc_type_and_name() :: {atom(), proc_name()}).
+
-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 34dd3d3b..3571692b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, start/6, start_link/6]).
+-export([start/6, start_link/6, start/7, start_link/7]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -30,7 +30,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/2, mainloop1/2]).
+-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -41,21 +41,25 @@
-ifdef(use_specs).
--spec(start/5 ::
+-spec(start/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start_link/5 ::
+-spec(start_link/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start/6 ::
+-spec(start/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
@@ -99,23 +103,23 @@
%%---------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+enter_mainloop(Identity, State) ->
+ Deb = sys:debug_options([]),
+ ?store_proc_name(Identity),
+ mainloop(Deb, State).
+
mainloop(Deb, State) ->
try
mainloop1(Deb, State)