summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2014-01-23 15:53:52 +0000
committerEmile Joubert <emile@rabbitmq.com>2014-01-23 15:53:52 +0000
commit660df83677206e69ab5a4e4936531345110f9765 (patch)
tree0112f03fdad3ca5e5e80267a75601b0b4bd76768
parent7456792ec74cdf72afa7ddc54935b22e933f4316 (diff)
parent0148ab85facdb6b827d4305b1ceba44e532b2c7e (diff)
downloadrabbitmq-server-660df83677206e69ab5a4e4936531345110f9765.tar.gz
Merged stable to default
-rw-r--r--codegen.py8
-rw-r--r--docs/rabbitmq.config.example5
-rw-r--r--docs/rabbitmqctl.1.xml36
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/gm.erl1
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl651
-rw-r--r--src/rabbit_binary_generator.erl61
-rw-r--r--src/rabbit_binary_parser.erl46
-rw-r--r--src/rabbit_channel.erl191
-rw-r--r--src/rabbit_channel_interceptor.erl91
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_connection_helper_sup.erl9
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_dead_letter.erl141
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_heartbeat.erl55
-rw-r--r--src/rabbit_limiter.erl100
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl1
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_net.erl7
-rw-r--r--src/rabbit_node_monitor.erl6
-rw-r--r--src/rabbit_nodes.erl11
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_queue_consumers.erl440
-rw-r--r--src/rabbit_queue_decorator.erl13
-rw-r--r--src/rabbit_reader.erl403
-rw-r--r--src/rabbit_registry.erl15
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_vhost.erl20
-rw-r--r--src/rabbit_writer.erl49
39 files changed, 1477 insertions, 1059 deletions
diff --git a/codegen.py b/codegen.py
index 842549cf..91fa1154 100644
--- a/codegen.py
+++ b/codegen.py
@@ -187,7 +187,7 @@ def genErl(spec):
elif type == 'table':
return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary'
- def genFieldPostprocessing(packed):
+ def genFieldPostprocessing(packed, hasContent):
for f in packed:
type = erlType(f.domain)
if type == 'bit':
@@ -199,6 +199,10 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ # We skip the check on content-bearing methods for
+ # speed. This is a sanity check, not a security thing.
+ elif type == 'shortstr' and not hasContent:
+ print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index)
else:
pass
@@ -214,7 +218,7 @@ def genErl(spec):
restSeparator = ''
recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments))
print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern)
- genFieldPostprocessing(packedFields)
+ genFieldPostprocessing(packedFields, m.hasContent)
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 91402649..c0d6cc70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -138,6 +138,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max permissible number of channels per connection.
+ %% 0 means "no limit".
+ %%
+ %% {channel_max, 128},
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6ec7ee07..d19acd00 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,6 +1135,13 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>consumer_utilisation</term>
+ <listitem><para>Fraction of the time (between 0.0 and 1.0)
+ that the queue is able to immediately deliver messages to
+ consumers. This can be less than 1.0 if consumers are limited
+ by network congestion or prefetch count.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
@@ -1390,24 +1397,9 @@
</varlistentry>
<varlistentry>
- <term>last_blocked_by</term>
- <listitem><para>The reason for which this connection
- was last blocked. One of 'resource' - due to a memory
- or disk alarm, 'flow' - due to internal flow control, or
- 'none' if the connection was never
- blocked.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>last_blocked_age</term>
- <listitem><para>Time, in seconds, since this
- connection was last blocked, or
- 'infinity'.</para></listitem>
- </varlistentry>
-
- <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1438,6 +1430,10 @@
<listitem><para>Maximum frame size (bytes).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>channel_max</term>
+ <listitem><para>Maximum number of channels on this connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>client_properties</term>
<listitem><para>Informational properties transmitted by the client
during connection establishment.</para></listitem>
@@ -1563,14 +1559,6 @@
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
- <varlistentry>
- <term>client_flow_blocked</term>
- <listitem><para>True if the client issued a
- <command>channel.flow{active=false}</command>
- command, blocking the server from delivering
- messages to the channel's consumers.
- </para></listitem>
- </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index f0fee96a..29f06e79 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {channel_max, 0},
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0f1b7a50..8b42cdea 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -118,3 +118,5 @@
%% to allow plenty of leeway for the #basic_message{} and #content{}
%% wrapping the message body).
-define(MAX_MSG_SIZE, 2147383648).
+
+-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649e..39a257ac 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/gm.erl b/src/gm.erl
index cb1f70ae..5a82950a 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -546,6 +546,7 @@ forget_group(GroupName) ->
ok.
init([GroupName, Module, Args, TxnFun]) ->
+ put(process_name, {?MODULE, GroupName}),
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index d54c2a8d..19171659 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
lists:foldl(
- fun(Module, {refused, _, _}) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} ->
- {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else ->
- Else
+ fun ({ModN, ModZ}, {refused, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ
+ %% module, and use the #user{} the latter gives us.
+ case try_login(ModN, Username, AuthProps) of
+ {ok, _} -> try_login(ModZ, Username, []);
+ Else -> Else
end;
- (_, {ok, User}) ->
+ (Mod, {refused, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ try_login(Mod, Username, AuthProps);
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
{ok, User}
end, {refused, "No modules checked '~s'", [Username]}, Modules).
+try_login(Module, Username, AuthProps) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else -> Else
+ end.
+
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 282113a4..773ad60d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,8 +26,8 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
--export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
@@ -150,9 +150,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/10 ::
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
+ rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -160,7 +160,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -426,7 +425,7 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_max_length_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
@@ -436,7 +435,7 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_max_length_arg({Type, Val}, Args) ->
+check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
@@ -549,8 +548,8 @@ requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
-reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+reject(QPid, Requeue, MsgIds, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
@@ -571,13 +570,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
- LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
- ok = check_consume_arguments(QName, OtherArgs),
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
- OkMsg}).
+ ConsumerTag, ExclusiveConsume, Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -604,8 +601,6 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
-
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 96851882..3322b3d1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,6 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -38,7 +37,7 @@
has_had_consumers,
backing_queue,
backing_queue_state,
- active_consumers,
+ consumers,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -56,21 +55,6 @@
status
}).
--record(consumer, {tag, ack_required, args}).
-
-%% These are held in our process dictionary
--record(cr, {ch_pid,
- monitor_ref,
- acktags,
- consumer_count,
- %% Queue of {ChPid, #consumer{}} for consumers which have
- %% been blocked for any reason
- blocked_consumers,
- %% The limiter itself
- limiter,
- %% Internal flow control for queue -> writer
- unsent_message_count}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -95,11 +79,12 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -122,6 +107,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
+ ?store_proc_name(Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -141,14 +127,14 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = priority_queue:new(),
+ consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -203,7 +189,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -228,18 +214,17 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+maybe_notify_decorators(false, State) -> State;
+maybe_notify_decorators(true, State) -> notify_decorators(State), State.
+
+notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []).
-notify_decorators(Event, Props, State = #q{active_consumers = ACs,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- P = priority_queue:highest(ACs),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ P = rabbit_queue_consumers:max_active_priority(Consumers),
+ decorator_callback(qname(State), consumer_state_changed,
+ [P, BQ:is_empty(BQS)]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -313,7 +298,7 @@ init_max_length(MaxLen, State) ->
State1.
terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue_state = BQS} =
+ State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
fun stop_rate_timer/1,
@@ -323,9 +308,10 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _} <- consumers(State1)],
+ {Ch, CTag, _, _} <-
+ rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -408,131 +394,19 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{active_consumers = AC}) ->
- true = (priority_queue:is_empty(AC) orelse is_empty(State)).
+assert_invariant(State = #q{consumers = Consumers}) ->
+ true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
-lookup_ch(ChPid) ->
- case get({ch, ChPid}) of
- undefined -> not_found;
- C -> C
- end.
-
-ch_record(ChPid, LimiterPid) ->
- Key = {ch, ChPid},
- case get(Key) of
- undefined -> MonitorRef = erlang:monitor(process, ChPid),
- Limiter = rabbit_limiter:client(LimiterPid),
- C = #cr{ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = queue:new(),
- consumer_count = 0,
- blocked_consumers = priority_queue:new(),
- limiter = Limiter,
- unsent_message_count = 0},
- put(Key, C),
- C;
- C = #cr{} -> C
- end.
-
-update_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {true, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
- end,
- C.
-
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C),
- ok.
-
-erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}),
- ok.
-
-all_ch_record() -> [C || {{ch, _}, C} <- get()].
-
-block_consumer(C = #cr{blocked_consumers = Blocked},
- {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
- update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}),
- notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
-
-is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
- Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
-
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
- [send_drained(C) || C <- all_ch_record()];
+ true -> notify_decorators(State),
+ rabbit_queue_consumers:send_drained();
false -> ok
end,
State.
-send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
- case rabbit_limiter:drained(Limiter) of
- {[], Limiter} -> ok;
- {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
- ChPid, CTagCredit),
- update_ch_record(C#cr{limiter = Limiter2})
- end.
-
-deliver_msgs_to_consumers(_DeliverFun, true, State) ->
- {true, State};
-deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers}) ->
- case priority_queue:out_p(ActiveConsumers) of
- {empty, _} ->
- {false, State};
- {{value, QEntry, Priority}, Tail} ->
- {Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry, Priority,
- State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(DeliverFun, Stop, State1)
- end.
-
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
- C = lookup_ch(ChPid),
- case is_ch_blocked(C) of
- true -> block_consumer(C, E, State),
- {false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter,
- Consumer#consumer.ack_required,
- Consumer#consumer.tag) of
- {suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E, State),
- {false, State};
- {continue, Limiter} ->
- AC1 = priority_queue:in(E, Priority,
- State#q.active_consumers),
- deliver_msg_to_consumer0(
- DeliverFun, Consumer, C#cr{limiter = Limiter},
- State#q{active_consumers = AC1})
- end
- end.
-
-deliver_msg_to_consumer0(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, Stop, State1} =
- DeliverFun(AckRequired, State),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> queue:in(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- update_ch_record(C#cr{acktags = ChAckTags1,
- unsent_message_count = Count + 1}),
- {Stop, State1}.
-
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -578,56 +452,68 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- {_Active, State3} = deliver_msgs_to_consumers(
- fun(AckRequired, State1) ->
- {Result, State2} = fetch(AckRequired, State1),
- {Result, is_empty(State2), State2}
- end, is_empty(State), State),
- State3.
+run_message_queue(State) -> run_message_queue(false, State).
-add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
- Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
- {_, P} -> P;
- _ -> 0
- end,
- priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
+run_message_queue(ActiveConsumersChanged, State) ->
+ case is_empty(State) of
+ true -> maybe_notify_decorators(ActiveConsumersChanged, State);
+ false -> case rabbit_queue_consumers:deliver(
+ fun(AckRequired) -> fetch(AckRequired, State) end,
+ qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged1, State1, Consumers} ->
+ run_message_queue(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State1#q{consumers = Consumers});
+ {undelivered, ActiveConsumersChanged1, Consumers} ->
+ maybe_notify_decorators(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State#q{consumers = Consumers})
+ end
+ end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case IsDuplicate of
- false -> deliver_msgs_to_consumers(
- fun (true, State2 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State2#q{backing_queue_state = BQS3}};
- (false, State2) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State2)}
- end, false, State1);
- true -> {true, State1}
+ case rabbit_queue_consumers:deliver(
+ fun (true) -> true = BQ:is_empty(BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS),
+ {{Message, Delivered, AckTag},
+ State#q{backing_queue_state = BQS1}};
+ (false) -> {{Message, Delivered, undefined},
+ discard(Delivery, State)}
+ end, qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State1#q{consumers = Consumers})};
+ {undelivered, ActiveConsumersChanged, Consumers} ->
+ {undelivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State#q{consumers = Consumers})}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
- Delivered, State) ->
+ Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
- case attempt_delivery(Delivery, Props, Delivered, State1) of
- {true, State2} ->
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State2 = State1#q{backing_queue_state = BQS1},
+ case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
+ State2) of
+ true ->
State2;
+ {delivered, State3} ->
+ State3;
%% The next one is an optimisation
- {false, State2 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- {Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
- QLen = BQ:len(BQS2),
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
+ discard(Delivery, State3);
+ {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ {Dropped, State4 = #q{backing_queue_state = BQS4}} =
+ maybe_drop_head(State3#q{backing_queue_state = BQS3}),
+ QLen = BQ:len(BQS4),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -636,9 +522,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State3;
- {true, true, undefined} -> State3;
- {_, _, _} -> drop_expired_msgs(State3)
+ {false, false, _} -> State4;
+ {true, true, undefined} -> State4;
+ {_, _, _} -> drop_expired_msgs(State4)
end
end.
@@ -688,63 +574,18 @@ requeue(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
-
-remove_consumers(ChPid, Queue, QName) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
-
-channel_consumers(ChPid, Queue) ->
- priority_queue:fold(
- fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid ->
- [CTag | Acc];
- (_, _, Acc) ->
- Acc
- end, [], Queue).
-
-possibly_unblock(State, ChPid, Update) ->
- case lookup_ch(ChPid) of
- not_found -> State;
- C -> C1 = Update(C),
- case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
- false -> update_ch_record(C1),
- State;
- true -> unblock(State, C1)
- end
- end.
-
-unblock(State, C = #cr{limiter = Limiter}) ->
- case lists:partition(
- fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
- rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, priority_queue:to_list(C#cr.blocked_consumers)) of
- {_, []} ->
- update_ch_record(C),
- State;
- {Blocked, Unblocked} ->
- BlockedQ = priority_queue:from_list(Blocked),
- UnblockedQ = priority_queue:from_list(Unblocked),
- update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
- [notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State1) ||
- {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State1)
+possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
+ case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{active_consumers = AC,
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
@@ -752,29 +593,22 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC,
true -> credit_flow:peer_down(DownPid),
pmon:demonitor(DownPid, Senders)
end},
- case lookup_ch(DownPid) of
+ case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
{ok, State1};
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- blocked_consumers = Blocked} ->
- QName = qname(State),
- AC1 = remove_consumers(ChPid, AC, QName),
- _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
- ok = erase_ch_record(C),
+ {ChAckTags, ChCTags, Consumers1} ->
+ QName = qname(State1),
+ [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags],
Holder1 = case Holder of
{DownPid, _} -> none;
Other -> Other
end,
- State2 = State1#q{active_consumers = AC1,
+ State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- channel_consumers(ChPid, AC)],
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- channel_consumers(ChPid, Blocked)],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
- false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(ChAckTags,
ensure_expiry_timer(State2))}
end
end.
@@ -789,10 +623,7 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
-
-is_unused(_State) -> consumer_count() == 0.
+is_unused(_State) -> rabbit_queue_consumers:count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -804,23 +635,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
- case lookup_ch(ChPid) of
- not_found ->
- State;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- Fun(State)
- end.
-
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
- case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
+ not_found -> State;
+ ok -> Fun(State)
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -900,117 +717,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- dead_letter_publish(Msg, Reason, X, RK, QName),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
{Res, State#q{backing_queue_state = BQS2}}.
-dead_letter_publish(Msg, Reason, X, RK, QName) ->
- DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
- {Queues, Cycles} = detect_dead_letter_cycles(
- Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
- ok.
-
stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-
-detect_dead_letter_cycles(expired,
- #basic_message{content = Content}, Queues) ->
- #content{properties = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- NoCycles = {Queues, []},
- case Headers of
- undefined ->
- NoCycles;
- _ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, Deaths} ->
- {Cycling, NotCycling} =
- lists:partition(
- fun (#resource{name = Queue}) ->
- is_dead_letter_cycle(Queue, Deaths)
- end, Queues),
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- Deaths],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- {NotCycling, [[QName | OldQueues1] ||
- #resource{name = QName} <- Cycling]};
- _ ->
- NoCycles
- end
- end;
-detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
- {Queues, []}.
-
-is_dead_letter_cycle(Queue, Deaths) ->
- {Cycle, Rest} =
- lists:splitwith(
- fun ({table, D}) ->
- {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
- (_) ->
- true
- end, Deaths),
- %% Is there a cycle, and if so, is it entirely due to expiry?
- case Rest of
- [] -> false;
- [H|_] -> lists:all(
- fun ({table, D}) ->
- {longstr, <<"expired">>} =:=
- rabbit_misc:table_lookup(D, <<"reason">>);
- (_) ->
- false
- end, Cycle ++ [H])
- end.
-
-make_dead_letter_msg(Msg = #basic_message{content = Content,
- exchange_name = Exchange,
- routing_keys = RoutingKeys},
- Reason, DLX, RK, #resource{name = QName}) ->
- {DeathRoutingKeys, HeadersFun1} =
- case RK of
- undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
- end,
- ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = rabbit_misc:now_ms() div 1000,
- PerMsgTTL = per_msg_ttl_header(Content#content.properties),
- HeadersFun2 =
- fun (Headers) ->
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- RKs1 = [{longstr, Key} || Key <- RKs],
- Info = [{<<"reason">>, longstr, ReasonBin},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, TimeSec},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
- end,
- Content1 = #content{properties = Props} =
- rabbit_basic:map_headers(HeadersFun2, Content),
- Content2 = Content1#content{properties =
- Props#'P_basic'{expiration = undefined}},
- Msg#basic_message{exchange_name = DLX,
- id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys,
- content = Content2}.
-
-per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
- [];
-per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
- [{<<"original-expiration">>, longstr, Expiration}];
-per_msg_ttl_header(_) ->
- [].
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1041,12 +758,17 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
+ rabbit_queue_consumers:unacknowledged_message_count();
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
i(consumers, _) ->
- consumer_count();
+ rabbit_queue_consumers:count();
+i(consumer_utilisation, #q{consumers = Consumers}) ->
+ case rabbit_queue_consumers:count() of
+ 0 -> '';
+ _ -> rabbit_queue_consumers:utilisation(Consumers)
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1064,29 +786,21 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers}) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(ActiveConsumers, []), all_ch_record()).
-
-consumers(Consumers, Acc) ->
- priority_queue:fold(
- fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
- end, Acc, Consumers).
-
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+ ExtraKs = [K || {K, _} <- Extra],
+ Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ not lists:member(K, ExtraKs)],
+ rabbit_event:notify(queue_stats, Extra ++ Infos).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
@@ -1167,8 +881,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From, State) ->
- reply(consumers(State), State);
+handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
+ reply(rabbit_queue_consumers:all(Consumers), State);
handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
@@ -1196,10 +910,8 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
{{Message, IsDelivered, AckTag},
#q{backing_queue = BQ, backing_queue_state = BQS} = State2} ->
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} =
- ch_record(ChPid, LimiterPid),
- ChAckTags1 = queue:in(AckTag, ChAckTags),
- update_ch_record(C#cr{acktags = ChAckTags1});
+ true -> ok = rabbit_queue_consumers:record_ack(
+ ChPid, LimiterPid, AckTag);
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
@@ -1207,78 +919,45 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
- _From, State = #q{active_consumers = AC,
+ ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ _From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use -> reply({error, exclusive_consume_unavailable}, State);
- ok -> C = #cr{consumer_count = Count,
- limiter = Limiter} =
- ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
- end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag,
- Crd, Drain)
- end,
- C1 = update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter2}),
- case is_empty(State) of
- true -> send_drained(C1);
- false -> ok
- end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck,
- args = OtherArgs},
- AC1 = add_consumer({ChPid, Consumer}, AC),
+ ok -> Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ Args, is_empty(State), Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
- State1 = State#q{active_consumers = AC1,
- has_had_consumers = true,
+ State1 = State#q{consumers = Consumers1,
+ has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ not NoAck, qname(State1), Args),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{active_consumers = AC,
+ State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
- case lookup_ch(ChPid) of
+ case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
- C = #cr{consumer_count = Count,
- limiter = Limiter,
- blocked_consumers = Blocked} ->
- AC1 = remove_consumer(ChPid, ConsumerTag, AC),
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- Limiter1 = case Count of
- 1 -> rabbit_limiter:deactivate(Limiter);
- _ -> Limiter
- end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
- update_ch_record(C#cr{consumer_count = Count - 1,
- limiter = Limiter2,
- blocked_consumers = Blocked1}),
+ Consumers1 ->
Holder1 = case Holder of
{ChPid, ConsumerTag} -> none;
_ -> Holder
end,
- State1 = State#q{active_consumers = AC1,
+ State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1288,7 +967,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_expiry_timer(State),
- reply({ok, BQ:len(BQS), consumer_count()}, State1);
+ reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1340,10 +1019,11 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State);
handle_call(force_event_refresh, _From,
- State = #q{exclusive_consumer = Exclusive}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
QName = qname(State),
- AllConsumers = consumers(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
Ch, CTag, false, AckRequired, QName, Args) ||
@@ -1372,10 +1052,10 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
-handle_cast({reject, AckTags, true, ChPid}, State) ->
+handle_cast({reject, true, AckTags, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State) ->
+handle_cast({reject, false, AckTags, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
fun (X) -> subtract_acks(ChPid, AckTags, State,
@@ -1389,29 +1069,16 @@ handle_cast(delete_immediately, State) ->
stop(State);
handle_cast({resume, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:resume(Limiter)}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(),
+ ChPid, State));
handle_cast({notify_sent, ChPid, Credit}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - Credit}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit),
+ ChPid, State));
handle_cast({activate_limit, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:activate(Limiter)}
- end));
-
-handle_cast({flush, ChPid}, State) ->
- ok = rabbit_channel:flushed(ChPid, self()),
- noreply(State);
+ noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
+ ChPid, State));
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1440,25 +1107,21 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
- State = #q{backing_queue = BQ,
+ State = #q{consumers = Consumers,
+ backing_queue = BQ,
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- C = #cr{limiter = Limiter} = lookup_ch(ChPid),
- C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
- noreply(case Drain andalso Len == 0 of
- true -> update_ch_record(C1),
- send_drained(C1),
- State;
- false -> case is_ch_blocked(C1) of
- true -> update_ch_record(C1),
- State;
- false -> unblock(State, C1)
- end
- end);
+ noreply(
+ case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag,
+ Consumers) of
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
+ end);
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
@@ -1549,20 +1212,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State, [{idle_since, now()},
+ {consumer_utilisation, ''}]) end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-
-log_cycle_once(Queues) ->
- Key = {queue_cycle, Queues},
- case get(Key) of
- true -> ok;
- undefined -> rabbit_log:warning(
- "Message dropped. Dead-letter queues cycle detected" ++
- ": ~p~nThis cycle will NOT be reported again.~n",
- [Queues]),
- put(Key, true)
- end.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index ae5bbf51..83f68ed3 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) ->
table_field_to_binary({FName, T, V}) ->
[short_string_to_binary(FName) | field_value_to_binary(T, V)].
-field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)];
-field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>];
+field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)];
+field_value_to_binary(signedint, V) -> [$I, <<V:32/signed>>];
field_value_to_binary(decimal, V) -> {Before, After} = V,
- ["D", Before, <<After:32>>];
-field_value_to_binary(timestamp, V) -> ["T", <<V:64>>];
-field_value_to_binary(table, V) -> ["F", table_to_binary(V)];
-field_value_to_binary(array, V) -> ["A", array_to_binary(V)];
-field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>];
-field_value_to_binary(double, V) -> ["d", <<V:64/float>>];
-field_value_to_binary(float, V) -> ["f", <<V:32/float>>];
-field_value_to_binary(long, V) -> ["l", <<V:64/signed>>];
-field_value_to_binary(short, V) -> ["s", <<V:16/signed>>];
-field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end];
-field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)];
-field_value_to_binary(void, _V) -> ["V"].
+ [$D, Before, <<After:32>>];
+field_value_to_binary(timestamp, V) -> [$T, <<V:64>>];
+field_value_to_binary(table, V) -> [$F | table_to_binary(V)];
+field_value_to_binary(array, V) -> [$A | array_to_binary(V)];
+field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> [$d, <<V:64/float>>];
+field_value_to_binary(float, V) -> [$f, <<V:32/float>>];
+field_value_to_binary(long, V) -> [$l, <<V:64/signed>>];
+field_value_to_binary(short, V) -> [$s, <<V:16/signed>>];
+field_value_to_binary(bool, V) -> [$t, if V -> 1; true -> 0 end];
+field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)];
+field_value_to_binary(void, _V) -> [$V].
table_to_binary(Table) when is_list(Table) ->
- BinTable = generate_table(Table),
- [<<(size(BinTable)):32>>, BinTable].
+ BinTable = generate_table_iolist(Table),
+ [<<(iolist_size(BinTable)):32>> | BinTable].
array_to_binary(Array) when is_list(Array) ->
- BinArray = generate_array(Array),
- [<<(size(BinArray)):32>>, BinArray].
+ BinArray = generate_array_iolist(Array),
+ [<<(iolist_size(BinArray)):32>> | BinArray].
generate_table(Table) when is_list(Table) ->
- list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
+ list_to_binary(generate_table_iolist(Table)).
-generate_array(Array) when is_list(Array) ->
- list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end,
- Array)).
+generate_table_iolist(Table) ->
+ lists:map(fun table_field_to_binary/1, Table).
+
+generate_array_iolist(Array) ->
+ lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array).
-short_string_to_binary(String) when is_binary(String) ->
- Len = size(String),
- if Len < 256 -> [<<Len:8>>, String];
- true -> exit(content_properties_shortstr_overflow)
- end;
short_string_to_binary(String) ->
- Len = length(String),
+ Len = string_length(String),
if Len < 256 -> [<<Len:8>>, String];
true -> exit(content_properties_shortstr_overflow)
end.
-long_string_to_binary(String) when is_binary(String) ->
- [<<(size(String)):32>>, String];
long_string_to_binary(String) ->
- [<<(length(String)):32>>, String].
+ Len = string_length(String),
+ [<<Len:32>>, String].
+
+string_length(String) when is_binary(String) -> size(String);
+string_length(String) -> length(String).
check_empty_frame_size() ->
%% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index dc6d090f..f65d8ea7 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -50,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) ->
{Type, Value, Rest} = parse_field_value(ValueAndRest),
[{Type, Value} | parse_array(Rest)].
-parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{longstr, V, R};
-parse_field_value(<<"I", V:32/signed, R/binary>>) ->
+parse_field_value(<<$I, V:32/signed, R/binary>>) ->
{signedint, V, R};
-parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) ->
+parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
{decimal, {Before, After}, R};
-parse_field_value(<<"T", V:64/unsigned, R/binary>>) ->
+parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
{timestamp, V, R};
-parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
+parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
{table, parse_table(Table), R};
-parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
+parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
{array, parse_array(Array), R};
-parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R};
-parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
+parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
+parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
+parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
+parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
-parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{binary, V, R};
-parse_field_value(<<"V", R/binary>>) ->
+parse_field_value(<<$V, R/binary>>) ->
{void, undefined, R}.
ensure_content_decoded(Content = #content{properties = Props})
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bc9ceac0..09798266 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,8 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
- flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -37,7 +36,7 @@
conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
- blocking, queue_consumers, delivering_queues,
+ queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -53,7 +52,7 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -98,7 +97,6 @@
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
--spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -148,9 +146,6 @@ send_credit_reply(Pid, Len) ->
send_drained(Pid, CTagCredit) ->
gen_server2:cast(Pid, {send_drained, CTagCredit}).
-flushed(Pid, QPid) ->
- gen_server2:cast(Pid, {flushed, QPid}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
@@ -193,6 +188,7 @@ force_event_refresh() ->
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
+ ?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
protocol = Protocol,
@@ -211,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
- blocking = sets:new(),
queue_consumers = dict:new(),
delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
@@ -271,7 +266,9 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(Method, Content, State) of
+ try handle_method(rabbit_channel_interceptor:intercept_method(
+ expand_shortcuts(Method, State)),
+ Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -287,9 +284,6 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({flushed, QPid}, State) ->
- {noreply, queue_blocked(QPid, State), hibernate};
-
handle_cast(ready_for_close, State = #ch{state = closing,
writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
@@ -364,8 +358,7 @@ handle_info(emit_stats, State) ->
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
- State2 = queue_blocked(QPid, State1),
- State3 = handle_consuming_queue_down(QPid, State2),
+ State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
@@ -476,9 +469,8 @@ check_resource_access(User, Resource, Perm) ->
put(permission_cache, [V | CacheTail])
end.
-clear_permission_cache() ->
- erase(permission_cache),
- ok.
+clear_permission_cache() -> erase(permission_cache),
+ ok.
check_configure_permitted(Resource, #ch{user = User}) ->
check_resource_access(User, Resource, configure).
@@ -526,31 +518,44 @@ check_msg_size(Content) ->
false -> ok
end.
+qbin_to_resource(QueueNameBin, State) ->
+ name_to_resource(queue, QueueNameBin, State).
+
+name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, Type, NameBin).
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ}) ->
- rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
- rabbit_misc:r(VHostPath, queue, QueueNameBin).
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_queue_name_shortcut(QueueNameBin, _) ->
+ QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
-expand_binding(queue, DestinationNameBin, RoutingKey, State) ->
- {expand_queue_name_shortcut(DestinationNameBin, State),
- expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)};
-expand_binding(exchange, DestinationNameBin, RoutingKey, State) ->
- {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin),
- RoutingKey}.
+expand_shortcuts(#'basic.get' {queue = Q} = M, State) ->
+ M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'basic.consume'{queue = Q} = M, State) ->
+ M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.delete' {queue = Q} = M, State) ->
+ M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.purge' {queue = Q} = M, State) ->
+ M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(M, _State) ->
+ M.
check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
rabbit_misc:protocol_error(
@@ -558,6 +563,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
+check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -574,20 +587,6 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
-queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case sets:is_element(QPid, Blocking) of
- false -> State;
- true -> maybe_send_flow_ok(
- State#ch{blocking = sets:del_element(QPid, Blocking)})
- end.
-
-maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
- case sets:size(Blocking) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State.
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -600,7 +599,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -710,7 +713,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
conn_pid = ConnPid,
limiter = Limiter,
next_tag = DeliveryTag}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -748,7 +751,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
@@ -763,13 +766,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
- ActualConsumerTag, ExclusiveConsume,
- CreditArgs, OtherArgs,
+ ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -853,8 +854,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
PrefetchCount, queue:len(UAMQ)),
- {reply, #'basic.qos_ok'{},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
@@ -937,6 +943,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
@@ -1057,7 +1064,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with(
QueueName,
@@ -1096,7 +1103,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -1142,36 +1149,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true},
- _, State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unblock(Limiter),
- {reply, #'channel.flow_ok'{active = true},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-
-handle_method(#'channel.flow'{active = false},
- _, State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
- case rabbit_limiter:is_blocked(Limiter) of
- true -> {noreply, maybe_send_flow_ok(State)};
- false -> Limiter1 = rabbit_limiter:block(Limiter),
- State1 = maybe_limit_queues(Limiter, Limiter1,
- State#ch{limiter = Limiter1}),
- %% The semantics of channel.flow{active=false}
- %% require that no messages are delivered after the
- %% channel.flow_ok has been sent. We accomplish that
- %% by "flushing" all messages in flight from the
- %% consumer queues to us. To do this we tell all the
- %% queues to invoke rabbit_channel:flushed/2, which
- %% will send us a {flushed, ...} message that appears
- %% *after* all the {deliver, ...} messages. We keep
- %% track of all the QPids thus asked, and once all of
- %% them have responded (or died) we send the
- %% channel.flow_ok.
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(
- State1#ch{blocking = sets:from_list(QPids)})}
- end;
+handle_method(#'channel.flow'{active = true}, _, State) ->
+ {reply, #'channel.flow_ok'{active = true}, State};
+
+handle_method(#'channel.flow'{active = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "active=false", []);
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
@@ -1256,29 +1238,18 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
- end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
- undefined -> {none, Arguments}
- end.
-
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- {DestinationName, ActualRoutingKey} =
- expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
+ DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, State),
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
- key = ActualRoutingKey,
+ key = RoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
@@ -1331,7 +1302,7 @@ reject(DeliveryTag, Requeue, Multiple,
reject(Requeue, Acked, Limiter) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1434,15 +1405,6 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-maybe_limit_queues(OldLimiter, NewLimiter, State) ->
- case ((not rabbit_limiter:is_active(OldLimiter)) andalso
- rabbit_limiter:is_active(NewLimiter)) of
- true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:activate_limit_all(Queues, self());
- false -> ok
- end,
- State.
-
consumer_queues(Consumers) ->
lists:usort([QPid ||
{_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
@@ -1454,7 +1416,7 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
%% optimisation: avoid the potentially expensive 'foldl' in the
%% common case.
- case rabbit_limiter:is_prefetch_limited(Limiter) of
+ case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1618,10 +1580,10 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
-i(client_flow_blocked, #ch{limiter = Limiter}) ->
- rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
@@ -1642,8 +1604,7 @@ update_measures(Type, Key, Inc, Measure) ->
end,
put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
-emit_stats(State) ->
- emit_stats(State, []).
+emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
Coarse = infos(?STATISTICS_KEYS, State),
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
new file mode 100644
index 00000000..5d1665e0
--- /dev/null
+++ b/src/rabbit_channel_interceptor.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% Since the AMQP methods used here are queue related,
+%% maybe we want this to be a queue_interceptor.
+
+-module(rabbit_channel_interceptor).
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([intercept_method/1]).
+
+-ifdef(use_specs).
+
+-type(intercept_method() :: rabbit_framing:amqp_method_name()).
+-type(original_method() :: rabbit_framing:amqp_method_record()).
+-type(processed_method() :: rabbit_framing:amqp_method_record()).
+
+-callback description() -> [proplists:property()].
+
+-callback intercept(original_method()) ->
+ rabbit_types:ok_or_error2(processed_method(), any()).
+
+%% Whether the interceptor wishes to intercept the amqp method
+-callback applies_to(intercept_method()) -> boolean().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {intercept, 1}, {applies_to, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+intercept_method(#'basic.publish'{} = M) ->
+ M;
+intercept_method(M) ->
+ intercept_method(M, select(rabbit_misc:method_record_type(M))).
+
+intercept_method(M, []) ->
+ M;
+intercept_method(M, [I]) ->
+ case I:intercept(M) of
+ {ok, M2} ->
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
+ end;
+ {error, Reason} ->
+ internal_error("Interceptor: ~p failed with reason: ~p",
+ [I, Reason])
+ end;
+intercept_method(M, Is) ->
+ internal_error("More than one interceptor for method: ~p -- ~p",
+ [rabbit_misc:method_record_type(M), Is]).
+
+%% select the interceptors that apply to intercept_method().
+select(Method) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor),
+ code:which(M) =/= non_existing,
+ M:applies_to(Method)].
+
+validate_method(M, M2) ->
+ rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+
+internal_error(Format, Args) ->
+ rabbit_misc:protocol_error(internal_error, Format, Args). \ No newline at end of file
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index df2e80ca..26f9700e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,9 +47,9 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE,
- {tcp, Sock, Channel, FrameMax,
- ReaderPid, Protocol}),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
[WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
@@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
init(Type) ->
{ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
-child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
-child_specs(direct) ->
- [{limiter, {rabbit_limiter, start_link, []},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index e51615e8..f268d8d6 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -20,7 +20,7 @@
-export([start_link/0]).
-export([start_channel_sup_sup/1,
- start_queue_collector/1]).
+ start_queue_collector/2]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) ->
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
-start_queue_collector(SupPid) ->
+start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
+ {collector, {rabbit_queue_collector, start_link, [Identity]},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99d..f3463286 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
new file mode 100644
index 00000000..640b282e
--- /dev/null
+++ b/src/rabbit_dead_letter.erl
@@ -0,0 +1,141 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_dead_letter).
+
+-export([publish/5]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(),
+ 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Msg, Reason, X, RK, QName) ->
+ DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
+ {Queues, Cycles} = detect_cycles(Reason, DLMsg,
+ rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
+
+make_msg(Msg = #basic_message{content = Content,
+ exchange_name = Exchange,
+ routing_keys = RoutingKeys},
+ Reason, DLX, RK, #resource{name = QName}) ->
+ {DeathRoutingKeys, HeadersFun1} =
+ case RK of
+ undefined -> {RoutingKeys, fun (H) -> H end};
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
+ TimeSec = rabbit_misc:now_ms() div 1000,
+ PerMsgTTL = per_msg_ttl_header(Content#content.properties),
+ HeadersFun2 =
+ fun (Headers) ->
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
+ end,
+ Content1 = #content{properties = Props} =
+ rabbit_basic:map_headers(HeadersFun2, Content),
+ Content2 = Content1#content{properties =
+ Props#'P_basic'{expiration = undefined}},
+ Msg#basic_message{exchange_name = DLX,
+ id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys,
+ content = Content2}.
+
+per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
+ [];
+per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
+ [{<<"original-expiration">>, longstr, Expiration}];
+per_msg_ttl_header(_) ->
+ [].
+
+detect_cycles(expired, #basic_message{content = Content}, Queues) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ NoCycles = {Queues, []},
+ case Headers of
+ undefined ->
+ NoCycles;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, Deaths} ->
+ {Cycling, NotCycling} =
+ lists:partition(fun (#resource{name = Queue}) ->
+ is_cycle(Queue, Deaths)
+ end, Queues),
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- Deaths],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
+ _ ->
+ NoCycles
+ end
+ end;
+detect_cycles(_Reason, _Msg, Queues) ->
+ {Queues, []}.
+
+is_cycle(Queue, Deaths) ->
+ {Cycle, Rest} =
+ lists:splitwith(
+ fun ({table, D}) ->
+ {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
+ (_) ->
+ true
+ end, Deaths),
+ %% Is there a cycle, and if so, is it entirely due to expiry?
+ case Rest of
+ [] -> false;
+ [H|_] -> lists:all(
+ fun ({table, D}) ->
+ {longstr, <<"expired">>} =:=
+ rabbit_misc:table_lookup(D, <<"reason">>);
+ (_) ->
+ false
+ end, Cycle ++ [H])
+ end.
+
+log_cycle_once(Queues) ->
+ Key = {queue_cycle, Queues},
+ case get(Key) of
+ true -> ok;
+ undefined -> rabbit_log:warning(
+ "Message dropped. Dead-letter queues cycle detected" ++
+ ": ~p~nThis cycle will NOT be reported again.~n",
+ [Queues]),
+ put(Key, true)
+ end.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 17ed8563..ab8c62fe 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -51,7 +51,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, false, []),
+ topic, true, false, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ca67254b..ff9de67a 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,8 @@
-module(rabbit_heartbeat).
--export([start/6]).
--export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+-export([start/6, start/7]).
+-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -39,12 +39,17 @@
non_neg_integer(), heartbeat_callback(),
non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
--spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
+-spec(start/7 ::
+ (pid(), rabbit_net:socket(), rabbit_types:proc_name(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
+-spec(start_heartbeat_sender/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -56,31 +61,35 @@
-endif.
%%----------------------------------------------------------------------------
-
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ start(SupPid, Sock, unknown,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
+
+start(SupPid, Sock, Identity,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
- start_heartbeat_sender),
+ start_heartbeat_sender, Identity),
{ok, Receiver} =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
+ start_heartbeat_receiver, Identity),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () -> SendFun(), continue end}).
+ fun () -> SendFun(), continue end}, Identity).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () -> ReceiveFun(), stop end}).
+ fun () -> ReceiveFun(), stop end}, Identity).
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
@@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
+ _Identity) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
+ Identity) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
-heartbeater(Params) ->
+heartbeater(Params, Identity) ->
Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
+ {ok, proc_lib:spawn_link(fun () ->
+ rabbit_misc:store_proc_name(Identity),
+ heartbeater(Params, Deb, {0, 0})
+ end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Deb, {StatVal, SameCount} = State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d5cfbce6..d37b356c 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,8 +17,7 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
-%% credit mechanism.
+%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -65,11 +64,9 @@
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
-%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
-%% about - and channel.flow blocking - that's what block/1,
-%% unblock/1 and is_blocked/1 are for. They also tell the limiter
-%% queue state (via the queue) about consumer credit changes -
-%% that's what credit/4 is for.
+%% get_prefetch_limit/1 API functions are about. They also tell the
+%% limiter queue state (via the queue) about consumer credit
+%% changes - that's what credit/5 is for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -83,12 +80,11 @@
%%
%% 5. Queues ask the limiter for permission (with can_send/3) whenever
%% they want to deliver a message to a channel. The limiter checks
-%% whether a) the channel isn't blocked by channel.flow, b) the
-%% volume has not yet reached the prefetch limit, and c) whether
-%% the consumer has enough credit. If so it increments the volume
-%% and tells the queue to proceed. Otherwise it marks the queue as
-%% requiring notification (see below) and tells the queue not to
-%% proceed.
+%% whether a) the volume has not yet reached the prefetch limit,
+%% and b) whether the consumer has enough credit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
%%
%% 6. A queue that has been told to proceed (by the return value of
%% can_send/3) sends the message to the channel. Conversely, a
@@ -117,16 +113,17 @@
-module(rabbit_limiter).
+-include("rabbit.hrl").
+
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/1]).
%% channel API
--export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
- is_prefetch_limited/1, is_blocked/1, is_active/1,
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -134,27 +131,23 @@
%%----------------------------------------------------------------------------
--record(lstate, {pid, prefetch_limited, blocked}).
+-record(lstate, {pid, prefetch_limited}).
-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
-type(lstate() :: #lstate{pid :: pid(),
- prefetch_limited :: boolean(),
- blocked :: boolean()}).
+ prefetch_limited :: boolean()}).
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
--spec(block/1 :: (lstate()) -> lstate()).
--spec(unblock/1 :: (lstate()) -> lstate()).
--spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
--spec(is_blocked/1 :: (lstate()) -> boolean()).
-spec(is_active/1 :: (lstate()) -> boolean()).
-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
@@ -168,8 +161,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
- -> qstate()).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+ boolean()) -> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -180,7 +173,6 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -193,12 +185,12 @@
%% API
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link(?MODULE, [], []).
+start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}, infinity),
- #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
+ #lstate{pid = Pid, prefetch_limited = false}.
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
ok = gen_server:call(
@@ -210,19 +202,7 @@ unlimit_prefetch(L) ->
ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
-block(L) ->
- ok = gen_server:call(L#lstate.pid, block, infinity),
- L#lstate{blocked = true}.
-
-unblock(L) ->
- ok = gen_server:call(L#lstate.pid, unblock, infinity),
- L#lstate{blocked = false}.
-
-is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-
-is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-
-is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+is_active(#lstate{prefetch_limited = Limited}) -> Limited.
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
get_prefetch_limit(L) ->
@@ -276,8 +256,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
- Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
+ {Res, Cr} = case IsEmpty andalso Drain of
+ true -> {true, make_credit(0, false)};
+ false -> {false, make_credit(Credit, Drain)}
+ end,
+ {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -303,6 +287,10 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
+make_credit(Credit, Drain) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ #credit{credit = Credit, drain = Drain andalso Credit > 0}.
+
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
{value, #credit{credit = Credit, drain = Drain}} ->
@@ -312,15 +300,14 @@ decrement_credit(CTag, Credits) ->
end.
update_credit(CTag, Credit, Drain, Credits) ->
- %% Using up all credit implies no need to send a 'drained' event
- Drain1 = Drain andalso Credit > 0,
- gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+ gb_trees:update(CTag, make_credit(Credit, Drain), Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, #lim{}}.
+init([ProcName]) -> ?store_proc_name(ProcName),
+ {ok, #lim{}}.
prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
prioritise_call(_Msg, _From, _Len, _State) -> 0.
@@ -339,19 +326,10 @@ handle_call(unlimit_prefetch, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
volume = 0})};
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call(unblock, _From, State) ->
- {reply, ok, maybe_notify(State, State#lim{blocked = false})};
-
handle_call(get_prefetch_limit, _From,
State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({can_send, QPid, _AckRequired}, _From,
- State = #lim{blocked = true}) ->
- {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case prefetch_limit_reached(State) of
@@ -387,8 +365,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
- not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ case prefetch_limit_reached(OldState) andalso
+ not prefetch_limit_reached(NewState) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -396,8 +374,6 @@ maybe_notify(OldState, NewState) ->
prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-blocked(#lim{blocked = Blocked}) -> Blocked.
-
remember_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index a0e8bcc6..6661408c 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -323,6 +323,7 @@ ensure_monitoring(CPid, Pids) ->
%% ---------------------------------------------------------------------------
init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
+ ?store_proc_name(QueueName),
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 58400f39..9ce5afcb 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ sync_mirrors(HandleInfo, EmitStats,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
Ref = make_ref(),
- Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc..d562210a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -79,6 +79,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
init(Q) ->
+ ?store_proc_name(Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
@@ -616,6 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
+ rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName),
rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 61e90105..e3fae4c0 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -61,7 +61,8 @@
-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
bqs()}).
--spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
+ log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
@@ -80,9 +81,12 @@
%% ---------------------------------------------------------------------------
%% Master
-master_prepare(Ref, Log, SPids) ->
+master_prepare(Ref, QName, Log, SPids) ->
MPid = self(),
- spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+ spawn_link(fun () ->
+ ?store_proc_name(QName),
+ syncer(Ref, Log, MPid, SPids)
+ end).
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 00c4eaf3..80e160d9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([store_proc_name/1, store_proc_name/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
+-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-endif.
%%----------------------------------------------------------------------------
@@ -1082,6 +1085,9 @@ stop_timer(State, Idx) ->
end
end.
+store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
+store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index e8c96818..401b8ab1 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -222,10 +222,9 @@ maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
maybe_ntoab(Host) -> Host.
rdns(Addr) ->
- {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
- case Lookup of
- true -> list_to_binary(rabbit_networking:tcp_host(Addr));
- _ -> Addr
+ case application:get_env(rabbit, reverse_dns_lookups) of
+ {ok, true} -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
end.
sock_funs(inbound) -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 10e68198..488f1df5 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -201,7 +201,7 @@ init([]) ->
%% writing out the cluster status files - bad things can then
%% happen.
process_flag(trap_exit, true),
- net_kernel:monitor_nodes(true),
+ net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
@@ -267,7 +267,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-handle_info({nodedown, Node}, State) ->
+handle_info({nodedown, Node, Info}, State) ->
+ rabbit_log:info("node ~p down: ~p~n",
+ [Node, proplists:get_value(nodedown_reason, Info)]),
ok = handle_dead_node(Node),
{noreply, State};
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b54fdd2e..5a1613a7 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,9 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2]).
+ is_running/2, is_process_running/2, fqdn_nodename/0]).
+
+-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
@@ -35,6 +37,7 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
+-spec(fqdn_nodename/0 :: () -> binary()).
-endif.
@@ -107,3 +110,9 @@ is_process_running(Node, Process) ->
undefined -> false;
P when is_pid(P) -> true
end.
+
+fqdn_nodename() ->
+ {ID, _} = rabbit_nodes:parts(node()),
+ {ok, Host} = inet:gethostname(),
+ {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
+ list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6406f7e9..855c7995 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1]).
+-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
@@ -39,8 +40,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(ProcName) ->
+ gen_server:start_link(?MODULE, [ProcName], []).
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
@@ -50,7 +51,8 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
-init([]) ->
+init([ProcName]) ->
+ ?store_proc_name(ProcName),
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
new file mode 100644
index 00000000..bea7e0d0
--- /dev/null
+++ b/src/rabbit_queue_consumers.erl
@@ -0,0 +1,440 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_consumers).
+
+-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
+ possibly_unblock/3,
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
+ credit/6, utilisation/1]).
+
+%%----------------------------------------------------------------------------
+
+-define(UNSENT_MESSAGE_LIMIT, 200).
+
+-record(state, {consumers, use}).
+
+-record(consumer, {tag, ack_required, args}).
+
+%% These are held in our process dictionary
+-record(cr, {ch_pid,
+ monitor_ref,
+ acktags,
+ consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
+ blocked_consumers,
+ %% The limiter itself
+ limiter,
+ %% Internal flow control for queue -> writer
+ unsent_message_count}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type time_micros() :: non_neg_integer().
+-type ratio() :: float().
+-type state() :: #state{consumers ::priority_queue:q(),
+ use :: {'inactive',
+ time_micros(), time_micros(), ratio()} |
+ {'active', time_micros(), ratio()}}.
+-type ch() :: pid().
+-type ack() :: non_neg_integer().
+-type cr_fun() :: fun ((#cr{}) -> #cr{}).
+-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
+
+-spec new() -> state().
+-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
+-spec inactive(state()) -> boolean().
+-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}].
+-spec count() -> non_neg_integer().
+-spec unacknowledged_message_count() -> non_neg_integer().
+-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
+ rabbit_framing:amqp_table(), boolean(), state()) -> state().
+-spec remove(ch(), rabbit_types:ctag(), state()) ->
+ 'not_found' | state().
+-spec erase_ch(ch(), state()) ->
+ 'not_found' | {[ack()], [rabbit_types:ctag()],
+ state()}.
+-spec send_drained() -> 'ok'.
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state()) ->
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
+-spec record_ack(ch(), pid(), ack()) -> 'ok'.
+-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec possibly_unblock(cr_fun(), ch(), state()) ->
+ 'unchanged' | {'unblocked', state()}.
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
+-spec utilisation(state()) -> ratio().
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() -> #state{consumers = priority_queue:new(),
+ use = {inactive, now_micros(), 0, 0.0}}.
+
+max_active_priority(#state{consumers = Consumers}) ->
+ priority_queue:highest(Consumers).
+
+inactive(#state{consumers = Consumers}) ->
+ priority_queue:is_empty(Consumers).
+
+all(#state{consumers = Consumers}) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(Consumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
+ end, Acc, Consumers).
+
+count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+unacknowledged_message_count() ->
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
+ update_ch_record(case parse_credit_args(Args) of
+ none -> C1;
+ {Crd, Drain} -> credit_and_drain(
+ C1, CTag, Crd, Drain, IsEmpty)
+ end),
+ Consumer = #consumer{tag = CTag,
+ ack_required = not NoAck,
+ args = Args},
+ State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
+
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
+ State#state{consumers =
+ remove_consumer(ChPid, CTag, Consumers)}
+ end.
+
+erase_ch(ChPid, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = BlockedQ} ->
+ AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ ok = erase_ch_record(C),
+ {queue:to_list(ChAckTags),
+ tags(priority_queue:to_list(AllConsumers)),
+ State#state{consumers = remove_consumers(ChPid, Consumers)}}
+ end.
+
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
+ ok.
+
+deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
+
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}) ->
+ case priority_queue:out_p(Consumers) of
+ {empty, _} ->
+ {undelivered, ConsumersChanged,
+ State#state{use = update_use(State#state.use, inactive)}};
+ {{value, QEntry, Priority}, Tail} ->
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, ConsumersChanged, R,
+ State#state{consumers = priority_queue:in(QEntry, Priority,
+ Tail)}};
+ undelivered ->
+ deliver(FetchFun, QName, true,
+ State#state{consumers = Tail})
+ end
+ end.
+
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
+ C = lookup_ch(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ undelivered;
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ undelivered;
+ {continue, Limiter} ->
+ {delivered, deliver_to_consumer(
+ FetchFun, Consumer,
+ C#cr{limiter = Limiter}, QName)}
+ end
+ end.
+
+deliver_to_consumer(FetchFun,
+ #consumer{tag = CTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> queue:in(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ R.
+
+record_ack(ChPid, LimiterPid, AckTag) ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
+ update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ ok.
+
+subtract_acks(ChPid, AckTags) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
+ ok
+ end.
+
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+possibly_unblock(Update, ChPid, State) ->
+ case lookup_ch(ChPid) of
+ not_found -> unchanged;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ unchanged;
+ true -> unblock(C1, State)
+ end
+ end.
+
+unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
+ State = #state{consumers = Consumers, use = Use}) ->
+ case lists:partition(
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, priority_queue:to_list(BlockedQ)) of
+ {_, []} ->
+ update_ch_record(C),
+ unchanged;
+ {Blocked, Unblocked} ->
+ BlockedQ1 = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
+ {unblocked,
+ State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
+ use = update_use(Use, active)}}
+ end.
+
+resume_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end.
+
+notify_sent_fun(Credit) ->
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - Credit}
+ end.
+
+activate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end.
+
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ unchanged;
+ #cr{limiter = Limiter} = C ->
+ C1 = #cr{limiter = Limiter1} =
+ credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ case is_ch_blocked(C1) orelse
+ (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
+ true -> update_ch_record(C1),
+ unchanged;
+ false -> unblock(C1, State)
+ end
+ end.
+
+utilisation(#state{use = {active, Since, Avg}}) ->
+ use_avg(now_micros() - Since, 0, Avg);
+utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
+ use_avg(Active, now_micros() - Since, Avg).
+
+%%----------------------------------------------------------------------------
+
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
+lookup_ch(ChPid) ->
+ case get({ch, ChPid}) of
+ undefined -> not_found;
+ C -> C
+ end.
+
+ch_record(ChPid, LimiterPid) ->
+ Key = {ch, ChPid},
+ case get(Key) of
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
+ C = #cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = queue:new(),
+ consumer_count = 0,
+ blocked_consumers = priority_queue:new(),
+ limiter = Limiter,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
+ C = #cr{} -> C
+ end.
+
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
+
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
+
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
+
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> C;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ C#cr{limiter = Limiter2}
+ end.
+
+credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
+ CTag, Credit, Drain, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ {true, Limiter1} -> rabbit_channel:send_drained(ChPid,
+ [{CTag, Credit}]),
+ C#cr{limiter = Limiter1};
+ {false, Limiter1} -> C#cr{limiter = Limiter1}
+ end.
+
+tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
+
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, Queue).
+
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
+ end, Queue).
+
+remove_consumers(ChPid, Queue) ->
+ priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
+ (_) -> true
+ end, Queue).
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
+now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a5..6205e2dc 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,9 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+%% called with Queue, MaxActivePriority, IsEmpty
+-callback consumer_state_changed(
+ rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
-else.
@@ -32,7 +27,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {consumer_state_changed, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1ae9bacf..64debcab 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/2, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -32,15 +32,16 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
+ stats_timer, channel_sup_sup_pid, channel_count, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -48,15 +49,14 @@
blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, last_blocked_by, last_blocked_age,
- channels]).
+ send_pend, state, channels]).
-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -91,9 +91,10 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
--spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
--spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) ->
+ any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
-endif.
@@ -113,8 +114,8 @@ init(Parent, HelperSup) ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
-system_continue(Parent, Deb, State) ->
- mainloop(Deb, State#v1{parent = Parent}).
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -213,6 +214,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ ?store_proc_name(list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -238,8 +240,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- buf = [],
- buf_len = 0,
+ channel_count = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
@@ -247,9 +248,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
blocked_sent = false}},
try
run({?MODULE, recvloop,
- [Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)]}),
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -276,31 +277,41 @@ run({M, F, A}) ->
catch {become, MFA} -> run(MFA)
end.
-recvloop(Deb, State = #v1{pending_recv = true}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{connection_state = blocked}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
case rabbit_net:setopts(Sock, [{active, once}]) of
- ok -> mainloop(Deb, State#v1{pending_recv = true});
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
{error, Reason} -> stop(Reason, State)
end;
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
- {Data, Rest} = split_binary(case Buf of
- [B] -> B;
- _ -> list_to_binary(lists:reverse(Buf))
- end, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen})).
-
-mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+recvloop(Deb, [B], _BufLen, State) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
+
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
- recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
@@ -309,11 +320,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
stop(Reason, State);
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
- ?MODULE, Deb, State);
+ ?MODULE, Deb, {Buf, BufLen, State});
{other, Other} ->
case handle_other(Other, State) of
stop -> ok;
- NewState -> recvloop(Deb, NewState)
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
end
end.
@@ -333,8 +344,8 @@ handle_other({conserve_resources, Source, Conserve},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- maybe_close(control_throttle(State));
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -491,63 +502,59 @@ close_connection(State = #v1{queue_collector = Collector,
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, controlled} -> State;
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, controlled} -> State1;
{undefined, uncontrolled} -> exit({abnormal_dependent_exit,
ChPid, Reason});
- {_Channel, controlled} -> maybe_close(control_throttle(State));
- {Channel, uncontrolled} -> State1 = handle_exception(
- State, Channel, Reason),
- maybe_close(control_throttle(State1))
+ {_, controlled} -> maybe_close(control_throttle(State1));
+ {_, uncontrolled} -> State2 = handle_exception(
+ State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
end.
-terminate_channels() ->
- NChannels =
- length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
- if NChannels > 0 ->
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
- TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(NChannels, TimerRef);
- true -> ok
- end.
+terminate_channels(#v1{channel_count = 0} = State) ->
+ State;
+terminate_channels(#v1{channel_count = ChannelCount} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(ChannelCount, TimerRef, State).
-wait_for_channel_termination(0, TimerRef) ->
+wait_for_channel_termination(0, TimerRef, State) ->
case erlang:cancel_timer(TimerRef) of
false -> receive
- cancel_wait -> ok
+ cancel_wait -> State
end;
- _ -> ok
+ _ -> State
end;
-
-wait_for_channel_termination(N, TimerRef) ->
+wait_for_channel_termination(N, TimerRef, State) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, _} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- wait_for_channel_termination(N-1, TimerRef);
- {Channel, uncontrolled} ->
- log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(N-1, TimerRef)
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, _} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_, controlled} -> wait_for_channel_termination(
+ N-1, TimerRef, State1);
+ {_, uncontrolled} -> log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
+ wait_for_channel_termination(
+ N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
end.
maybe_close(State = #v1{connection_state = closing,
- connection = #connection{protocol = Protocol},
- sock = Sock}) ->
- case all_channels() of
- [] ->
- NewState = close_connection(State),
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- NewState;
- _ -> State
- end;
+ channel_count = 0,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
maybe_close(State) ->
State.
@@ -566,8 +573,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
[self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
+ State1 = close_connection(terminate_channels(State)),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
State1;
handle_exception(State, Channel, Reason) ->
@@ -605,15 +611,26 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
+create_channel(_Channel,
+ #v1{channel_count = ChannelCount,
+ connection = #connection{channel_max = ChannelMax}})
+ when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has reached the "
+ "negotiated channel_max (~w)",
+ [ChannelCount, ChannelMax], 'none')};
+create_channel(Channel,
+ #v1{sock = Sock,
+ queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ channel_count = ChannelCount,
+ connection =
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State) ->
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
@@ -621,16 +638,16 @@ create_channel(Channel, State) ->
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
-channel_cleanup(ChPid) ->
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
case get({ch_pid, ChPid}) of
- undefined -> undefined;
+ undefined -> {undefined, State};
{Channel, MRef} -> credit_flow:peer_down(ChPid),
erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
- Channel
+ {Channel, State#v1{channel_count = ChannelCount - 1}}
end.
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -669,32 +686,36 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other, State}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}, State1} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
+ {error, Reason} ->
+ handle_exception(State1, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
- channel_cleanup(ChPid),
+ {_, State1} = channel_cleanup(ChPid, State),
%% This is not strictly necessary, but more obviously
%% correct. Also note that we do not need to call maybe_close/1
%% since we cannot possibly be in the 'closing' state.
- control_throttle(State);
+ control_throttle(State1);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -708,32 +729,36 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
-
-handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
- handshake({A, B, C, D}, State);
-handle_input(handshake, Other, #v1{sock = Sock}) ->
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
-
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
@@ -848,38 +873,33 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
sock = Sock}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
- end;
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
@@ -927,13 +947,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
+
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -999,9 +1034,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1028,13 +1063,15 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
- infinity;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
- timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) -> length(all_channels());
+i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{last_blocked_by = BlockedBy,
+ last_blocked_at = T}}) ->
+ Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
+ case {BlockedBy, Recent} of
+ {flow, true} -> flow;
+ {_, _} -> ConnectionState
+ end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;
@@ -1049,6 +1086,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
@@ -1089,10 +1127,8 @@ emit_stats(State) ->
%% If we emit an event which looks like we are in flow control, it's not a
%% good idea for it to be our last even if we go idle. Keep emitting
%% events, either we stay busy or we drop out of flow control.
- %% The 5 is to match the test in formatters.js:fmt_connection_state().
- %% This magic number will go away when bug 24829 is merged.
- case proplists:get_value(last_blocked_age, Infos) < 5 of
- true -> ensure_stats_timer(State1);
+ case proplists:get_value(state, Infos) of
+ flow -> ensure_stats_timer(State1);
_ -> State1
end.
@@ -1110,15 +1146,16 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- throw({become, {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(State)]}})
+ F = fun (_Deb, Buf, BufLen, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
+ end,
+ State = #v1{connection_state = {become, F}}
end.
-pack_for_1_0(#v1{parent = Parent,
- sock = Sock,
- recv_len = RecvLen,
- pending_recv = PendingRecv,
- helper_sup = SupPid,
- buf = Buf,
- buf_len = BufLen}) ->
+pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ helper_sup = SupPid}) ->
{Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3014aeb7..abb71e7a 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(queue_decorator) -> rabbit_queue_decorator;
-class_module(policy_validator) -> rabbit_policy_validator;
-class_module(ha_mode) -> rabbit_mirror_queue_mode.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
+class_module(policy_validator) -> rabbit_policy_validator;
+class_module(ha_mode) -> rabbit_mirror_queue_mode;
+class_module(channel_interceptor) -> rabbit_channel_interceptor.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3..731351f0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1172,7 +1172,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1262,7 +1262,7 @@ test_writer(Pid) ->
test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me, Limiter),
@@ -2815,7 +2815,7 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..0edebff1 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -30,7 +30,8 @@
connection/0, protocol/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
- channel_exit/0, connection_exit/0, mfargs/0]).
+ channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
+ proc_type_and_name/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +157,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
+-type(proc_name() :: term()).
+-type(proc_type_and_name() :: {atom(), proc_name()}).
+
-endif. % use_specs
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6f95ef60..90372461 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -46,6 +46,7 @@
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
+-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
%% -------------------------------------------------------------------
@@ -74,6 +75,7 @@
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
+-spec(internal_system_x/0 :: () -> 'ok').
-endif.
@@ -340,6 +342,19 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+internal_system_x() ->
+ transform(
+ rabbit_durable_exchange,
+ fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>},
+ Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches,
+ Policy, Decorators};
+ (X) ->
+ X
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d013d43..047bce77 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -60,15 +60,17 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout},
- {<<"amq.rabbitmq.trace">>, topic}]],
+ Type, true, false, Internal, []) ||
+ {Name, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
rabbit_event:notify(vhost_created, info(VHostPath)),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 34dd3d3b..3571692b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, start/6, start_link/6]).
+-export([start/6, start_link/6, start/7, start_link/7]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -30,7 +30,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/2, mainloop1/2]).
+-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -41,21 +41,25 @@
-ifdef(use_specs).
--spec(start/5 ::
+-spec(start/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start_link/5 ::
+-spec(start_link/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name())
-> rabbit_types:ok(pid())).
--spec(start/6 ::
+-spec(start/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
@@ -99,23 +103,23 @@
%%---------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, false).
-start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -138,6 +142,11 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+enter_mainloop(Identity, State) ->
+ Deb = sys:debug_options([]),
+ ?store_proc_name(Identity),
+ mainloop(Deb, State).
+
mainloop(Deb, State) ->
try
mainloop1(Deb, State)