summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-30 14:03:30 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-30 14:03:30 +0000
commit1cb35e55a99a2733d8237168e7e67868ff8c9651 (patch)
tree5ee1b4dae5738e58f3eefd3feee4c760629bc522
parent809d09982022889cb53e00323a69fcb07787dabe (diff)
parentfe0851e4573f2abe13c847e2f3c3f36c3f307d39 (diff)
downloadrabbitmq-server-1cb35e55a99a2733d8237168e7e67868ff8c9651.tar.gz
merge bug25827 into default
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--include/rabbit.hrl9
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/dtree.erl11
-rw-r--r--src/gm.erl106
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_amqqueue.erl71
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_basic.erl28
-rw-r--r--src/rabbit_channel.erl233
-rw-r--r--src/rabbit_channel_interceptor.erl20
-rw-r--r--src/rabbit_dead_letter.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl19
-rw-r--r--src/rabbit_limiter.erl66
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_queue_consumers.erl46
-rw-r--r--src/rabbit_reader.erl18
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_trace.erl6
21 files changed, 366 insertions, 366 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d2a3f7c7..d19acd00 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1559,14 +1559,6 @@
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
- <varlistentry>
- <term>client_flow_blocked</term>
- <listitem><para>True if the client issued a
- <command>channel.flow{active=false}</command>
- command, blocking the server from delivering
- messages to the channel's consumers.
- </para></listitem>
- </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index afb6e576..6d117e3d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,7 +70,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, confirm, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
@@ -112,4 +112,11 @@
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
+%% Trying to send a term across a cluster larger than 2^31 bytes will
+%% cause the VM to exit with "Absurdly large distribution output data
+%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB
+%% to allow plenty of leeway for the #basic_message{} and #content{}
+%% wrapping the message body).
+-define(MAX_MSG_SIZE, 2147383648).
+
-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f53eea95..a96ccb35 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Jan 23 2014 emile@rabbitmq.com 3.2.3-1
+- New Upstream Release
+
* Tue Dec 10 2013 emile@rabbitmq.com 3.2.2-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index f1e1c66b..7138409c 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.2.3-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Emile Joubert <emile@rabbitmq.com> Thu, 23 Jan 2014 14:46:37 +0000
+
rabbitmq-server (3.2.2-1) unstable; urgency=low
* New Upstream Release
diff --git a/src/dtree.erl b/src/dtree.erl
index 5ff36bd9..72abe248 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,7 @@
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -120,6 +121,14 @@ take_all(SK, {P, S}) ->
{KVs, {P1, prune(SKS, PKS, S)}}
end.
+%% Drop all entries for the given primary key (which does not have to exist).
+drop(PK, {P, S}) ->
+ case gb_trees:lookup(PK, P) of
+ none -> {P, S};
+ {value, {SKS, _V}} -> {gb_trees:delete(PK, P),
+ prune(SKS, gb_sets:singleton(PK), S)}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
diff --git a/src/gm.erl b/src/gm.erl
index df1c258d..5a82950a 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -395,6 +395,7 @@
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
+-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
@@ -414,6 +415,7 @@
callback_args,
confirms,
broadcast_buffer,
+ broadcast_buffer_sz,
broadcast_timer,
txn_executor
}).
@@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) ->
leave(Server) ->
gen_server2:cast(Server, leave).
-broadcast(Server, Msg) ->
- gen_server2:cast(Server, {broadcast, Msg}).
+broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
+
+broadcast(Server, Msg, SizeHint) ->
+ gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
@@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) ->
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = -1,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new(),
- broadcast_buffer = [],
- broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = -1,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, State);
+ internal_broadcast(Msg, From, 0, State);
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
noreply(State);
-handle_cast({broadcast, Msg},
+handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
@@ -650,8 +656,8 @@ handle_cast({broadcast, Msg},
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
-handle_cast({broadcast, Msg}, State) ->
- internal_broadcast(Msg, none, State);
+handle_cast({broadcast, Msg, SizeHint}, State) ->
+ internal_broadcast(Msg, none, SizeHint, State);
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- module = Module,
- confirms = Confirms,
- callback_args = Args,
- broadcast_buffer = Buffer }) ->
+internal_broadcast(Msg, From, SizeHint,
+ State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer,
+ broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount1, Msg} | Buffer],
@@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self,
none -> Confirms;
_ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1 },
- handle_callback_result({Result, case From of
- none -> State1;
- _ -> flush_broadcast_buffer(State1)
- end}).
+ State1 = State #state { pub_count = PubCount1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1,
+ broadcast_buffer_sz = BufferSize + SizeHint},
+ handle_callback_result(
+ {Result, case From of
+ none -> maybe_flush_broadcast_buffer(State1);
+ _ -> flush_broadcast_buffer(State1)
+ end}).
+
+%% The Erlang distribution mechanism has an interesting quirk - it
+%% will kill the VM cold with "Absurdly large distribution output data
+%% buffer" if you attempt to send a message which serialises out to
+%% more than 2^31 bytes in size. It's therefore a very good idea to
+%% make sure that we don't exceed that size!
+%%
+%% Now, we could figure out the size of messages as they come in using
+%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
+%% us to serialise the message only to throw the serialised form
+%% away. Hard to believe that's a sensible thing to do. So instead we
+%% accept a size hint from the application, via broadcast/3. This size
+%% hint can be the size of anything in the message which we expect
+%% could be large, and we just ignore the size of any small bits of
+%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
+%% conservatively at 100MB - but the buffer is only to allow us to
+%% buffer tiny messages anyway, so 100MB is plenty.
+
+maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
+ case Size > ?MAX_BUFFER_SIZE of
+ true -> flush_broadcast_buffer(State);
+ false -> State
+ end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
@@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self,
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
- State #state { members_state = MembersState1,
- broadcast_buffer = [] }.
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0}.
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index d54c2a8d..19171659 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
lists:foldl(
- fun(Module, {refused, _, _}) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} ->
- {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else ->
- Else
+ fun ({ModN, ModZ}, {refused, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ
+ %% module, and use the #user{} the latter gives us.
+ case try_login(ModN, Username, AuthProps) of
+ {ok, _} -> try_login(ModZ, Username, []);
+ Else -> Else
end;
- (_, {ok, User}) ->
+ (Mod, {refused, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ try_login(Mod, Username, AuthProps);
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
{ok, User}
end, {refused, "No modules checked '~s'", [Username]}, Modules).
+try_login(Module, Username, AuthProps) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else -> Else
+ end.
+
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 55b98a0c..eeb0e0bf 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,8 +26,8 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
--export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
@@ -51,7 +51,7 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0, routing_result/0]).
+-export_type([name/0, qmsg/0]).
-type(name() :: rabbit_types:r('queue')).
-type(qpids() :: [pid()]).
@@ -61,7 +61,6 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_absent() :: rabbit_types:amqqueue() |
{'absent', rabbit_types:amqqueue()}).
-type(not_found_or_absent() :: 'not_found' |
@@ -113,10 +112,9 @@
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(consumers/1 ::
- (rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+-spec(consumers/1 :: (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -139,9 +137,9 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
+ qpids()).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
+ qpids()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -151,9 +149,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/10 ::
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
+ rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -161,7 +159,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -433,7 +430,7 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_max_length_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
@@ -443,7 +440,7 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_max_length_arg({Type, Val}, Args) ->
+check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
@@ -556,8 +553,8 @@ requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
-reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+reject(QPid, Requeue, MsgIds, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
@@ -578,13 +575,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
- LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
- ok = check_consume_arguments(QName, OtherArgs),
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
- OkMsg}).
+ ConsumerTag, ExclusiveConsume, Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -611,8 +606,6 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
-
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
@@ -710,17 +703,11 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-deliver([], #delivery{mandatory = false}, _Flow) ->
+deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
- %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
- %% will deliver the message to the queue process asynchronously,
- %% and return true, which means all the QPids will always be
- %% returned. It is therefore safe to use a fire-and-forget cast
- %% here and return the QPids - the semantics is preserved. This
- %% scales much better than the case below.
+ [];
+
+deliver(Qs, Delivery, Flow) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
case Flow of
@@ -737,19 +724,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
SMsg = {deliver, Delivery, true, Flow},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
- {routed, QPids};
-
-deliver(Qs, Delivery, _Flow) ->
- {MPids, SPids} = qpids(Qs),
- %% see comment above
- MMsg = {deliver, Delivery, false},
- SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:call(MPids, MMsg),
- {SRouted, _} = delegate:call(SPids, SMsg),
- case MRouted ++ SRouted of
- [] -> {unroutable, []};
- R -> {routed, [QPid || {QPid, ok} <- R]}
- end.
+ QPids.
qpids([]) -> {[], []}; %% optimisation
qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 51032fc7..da8c0607 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -429,9 +429,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirm = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
@@ -440,11 +441,19 @@ send_or_record_confirm(#delivery{sender = SenderPid,
msg_id_to_channel = MTC}) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
-send_or_record_confirm(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{confirm = true,
+ sender = SenderPid,
msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
discard(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}}, State) ->
@@ -500,6 +509,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -889,11 +899,6 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
-handle_call({deliver, Delivery, Delivered}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, Delivered, State));
-
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
@@ -924,7 +929,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -932,8 +937,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- CreditArgs, OtherArgs,
- is_empty(State), Consumers),
+ Args, is_empty(State), Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -943,7 +947,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
+ not NoAck, qname(State1), Args),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1046,7 +1050,6 @@ handle_cast({run_backing_queue, Mod, Fun},
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
@@ -1058,10 +1061,10 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
-handle_cast({reject, AckTags, true, ChPid}, State) ->
+handle_cast({reject, true, AckTags, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State) ->
+handle_cast({reject, false, AckTags, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
fun (X) -> subtract_acks(ChPid, AckTags, State,
@@ -1086,10 +1089,6 @@ handle_cast({activate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
ChPid, State));
-handle_cast({flush, ChPid}, State) ->
- ok = rabbit_channel:flushed(ChPid, self()),
- noreply(State);
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536..a5dc6eb2 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,9 +20,9 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -31,8 +31,7 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_amqqueue:routing_result(), [pid()]}
- | rabbit_types:error('not_found'))).
+ ({ok, [pid()]} | rabbit_types:error('not_found'))).
-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -46,8 +45,8 @@
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/3 ::
- (boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -77,6 +76,9 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) ->
+ non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -90,10 +92,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) ->
%% erlang distributed network.
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(X, delivery(Mandatory, Message, undefined));
+ publish(X, delivery(Mandatory, false, Message, undefined));
publish(XName, RKey, Mandatory, Props, Body) ->
Message = message(XName, RKey, properties(Props), Body),
- publish(delivery(Mandatory, Message, undefined)).
+ publish(delivery(Mandatory, false, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -104,11 +106,11 @@ publish(Delivery = #delivery{
publish(X, Delivery) ->
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
- {ok, RoutingRes, DeliveredQPids}.
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
+ {ok, DeliveredQPids}.
-delivery(Mandatory, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, sender = self(),
+delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
@@ -274,3 +276,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
+msg_size(#basic_message{content = Content}) -> msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 469cf4f7..4d866908 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,8 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
- flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -37,9 +36,9 @@
conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
- blocking, queue_consumers, delivering_queues,
+ queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, capabilities, trace_state}).
+ unconfirmed, confirmed, mandatory, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -53,7 +52,6 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked,
state]).
-define(CREATION_EVENT_KEYS,
@@ -99,7 +97,6 @@
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
--spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -149,9 +146,6 @@ send_credit_reply(Pid, Len) ->
send_drained(Pid, CTagCredit) ->
gen_server2:cast(Pid, {send_drained, CTagCredit}).
-flushed(Pid, QPid) ->
- gen_server2:cast(Pid, {flushed, QPid}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
@@ -213,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
- blocking = sets:new(),
queue_consumers = dict:new(),
delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
@@ -221,6 +214,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
publish_seqno = 1,
unconfirmed = dtree:empty(),
confirmed = [],
+ mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
@@ -239,8 +233,9 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- _ -> 0
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {mandatory_received, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
@@ -268,13 +263,14 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content, Flow},
- State = #ch{reader_pid = Reader}) ->
+ State = #ch{reader_pid = Reader,
+ virtual_host = VHost}) ->
case Flow of
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
try handle_method(rabbit_channel_interceptor:intercept_method(
- expand_shortcuts(Method, State)),
+ expand_shortcuts(Method, State), VHost),
Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
@@ -291,9 +287,6 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({flushed, QPid}, State) ->
- {noreply, queue_blocked(QPid, State), hibernate};
-
handle_cast(ready_for_close, State = #ch{state = closing,
writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
@@ -346,11 +339,14 @@ handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
-handle_cast({confirm, MsgSeqNos, From}, State) ->
- State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- Timeout = case C of [] -> hibernate; _ -> 0 end,
+handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
- {noreply, ensure_stats_timer(State1), Timeout}.
+ noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
+
+handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -368,8 +364,7 @@ handle_info(emit_stats, State) ->
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
- State2 = queue_blocked(QPid, State1),
- State3 = handle_consuming_queue_down(QPid, State2),
+ State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
@@ -416,6 +411,10 @@ noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
next_state(State) -> ensure_stats_timer(send_confirms(State)).
+noreply_coalesce(State = #ch{confirmed = C}) ->
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ {noreply, ensure_stats_timer(State), Timeout}.
+
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -480,9 +479,8 @@ check_resource_access(User, Resource, Perm) ->
put(permission_cache, [V | CacheTail])
end.
-clear_permission_cache() ->
- erase(permission_cache),
- ok.
+clear_permission_cache() -> erase(permission_cache),
+ ok.
check_configure_permitted(Resource, #ch{user = User}) ->
check_resource_access(User, Resource, configure).
@@ -522,6 +520,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~B larger than max size ~B",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
qbin_to_resource(QueueNameBin, State) ->
name_to_resource(queue, QueueNameBin, State).
@@ -529,8 +535,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
rabbit_misc:r(VHostPath, Type, NameBin).
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
@@ -538,8 +543,7 @@ expand_queue_name_shortcut(QueueNameBin, _) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
@@ -593,31 +597,11 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
-queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case sets:is_element(QPid, Blocking) of
- false -> State;
- true -> maybe_send_flow_ok(
- State#ch{blocking = sets:del_element(QPid, Blocking)})
- end.
-
-maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
- case sets:size(Blocking) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State.
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
State#ch{confirmed = [MXs | C]}.
-confirm([], _QPid, State) ->
- State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
-
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
State1 = State#ch{state = running},
@@ -679,6 +663,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -689,16 +674,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(Props, State),
check_expiration_header(Props),
+ DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
- case {Tx, ConfirmEnabled} of
- {none, false} -> {undefined, State};
- {_, _} -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case DoConfirm orelse Mandatory of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(
+ Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
{noreply, case Tx of
@@ -785,13 +772,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
- ActualConsumerTag, ExclusiveConsume,
- CreditArgs, OtherArgs,
+ ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -875,8 +860,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
PrefetchCount, queue:len(UAMQ)),
- {reply, #'basic.qos_ok'{},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
@@ -1165,36 +1155,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true},
- _, State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unblock(Limiter),
- {reply, #'channel.flow_ok'{active = true},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-
-handle_method(#'channel.flow'{active = false},
- _, State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
- case rabbit_limiter:is_blocked(Limiter) of
- true -> {noreply, maybe_send_flow_ok(State)};
- false -> Limiter1 = rabbit_limiter:block(Limiter),
- State1 = maybe_limit_queues(Limiter, Limiter1,
- State#ch{limiter = Limiter1}),
- %% The semantics of channel.flow{active=false}
- %% require that no messages are delivered after the
- %% channel.flow_ok has been sent. We accomplish that
- %% by "flushing" all messages in flight from the
- %% consumer queues to us. To do this we tell all the
- %% queues to invoke rabbit_channel:flushed/2, which
- %% will send us a {flushed, ...} message that appears
- %% *after* all the {deliver, ...} messages. We keep
- %% track of all the QPids thus asked, and once all of
- %% them have responded (or died) we send the
- %% channel.flow_ok.
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(
- State1#ch{blocking = sets:from_list(QPids)})}
- end;
+handle_method(#'channel.flow'{active = true}, _, State) ->
+ {reply, #'channel.flow_ok'{active = true}, State};
+
+handle_method(#'channel.flow'{active = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "active=false", []);
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
@@ -1245,12 +1210,17 @@ monitor_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QPid, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
+ mandatory = Mand}) ->
+ {MMsgs, Mand1} = dtree:take(QPid, Mand),
+ [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
+ State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State#ch{unconfirmed = UC1});
+ send_nacks(MXs, State1#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1})
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
+
end.
handle_consuming_queue_down(QPid,
@@ -1279,16 +1249,6 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
- end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
- undefined -> {none, Arguments}
- end.
-
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1328,7 +1288,9 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content},
- #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
+ State = #ch{protocol = Protocol, writer_pid = WriterPid},
+ Reason) ->
+ ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
@@ -1353,7 +1315,7 @@ reject(DeliveryTag, Requeue, Multiple,
reject(Requeue, Acked, Limiter) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1456,15 +1418,6 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-maybe_limit_queues(OldLimiter, NewLimiter, State) ->
- case ((not rabbit_limiter:is_active(OldLimiter)) andalso
- rabbit_limiter:is_active(NewLimiter)) of
- true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:activate_limit_all(Queues, self());
- false -> ok
- end,
- State.
-
consumer_queues(Consumers) ->
lists:usort([QPid ||
{_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
@@ -1476,7 +1429,7 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
%% optimisation: avoid the potentially expensive 'foldl' in the
%% common case.
- case rabbit_limiter:is_prefetch_limited(Limiter) of
+ case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1487,18 +1440,19 @@ notify_limiter(Limiter, Acked) ->
end.
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = undefined,
mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
+ mandatory = Mandatory,
+ confirm = Confirm,
msg_seq_no = MsgSeqNo},
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1517,31 +1471,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message,
- State#ch{queue_names = QNames1,
- queue_monitors = QMons1}),
+ State1 = State#ch{queue_names = QNames1,
+ queue_monitors = QMons1},
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo,
+ Message, State1),
+ State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
+ XName, State2),
?INCR_STATS([{exchange_stats, XName, 1} |
[{queue_exchange_stats, {QName, XName}, 1} ||
QPid <- DeliveredQPids,
{ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State1),
- State1.
+ publish, State3),
+ State3.
+
+process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+ State;
+process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ State;
+process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+ State#ch.mandatory)}.
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
State;
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
+process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)};
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
- case MsgSeqNo of
- undefined -> State;
- _ -> record_confirms([{MsgSeqNo, XName}], State)
- end.
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
@@ -1644,8 +1604,6 @@ i(state, #ch{state = running}) -> credit_flow:state();
i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
-i(client_flow_blocked, #ch{limiter = Limiter}) ->
- rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
@@ -1666,8 +1624,7 @@ update_measures(Type, Key, Inc, Measure) ->
end,
put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
-emit_stats(State) ->
- emit_stats(State, []).
+emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
Coarse = infos(?STATISTICS_KEYS, State),
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 5d1665e0..2bd22579 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -22,7 +22,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([intercept_method/1]).
+-export([intercept_method/2]).
-ifdef(use_specs).
@@ -32,7 +32,7 @@
-callback description() -> [proplists:property()].
--callback intercept(original_method()) ->
+-callback intercept(original_method(), rabbit_types:vhost()) ->
rabbit_types:ok_or_error2(processed_method(), any()).
%% Whether the interceptor wishes to intercept the amqp method
@@ -43,7 +43,7 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {intercept, 1}, {applies_to, 1}];
+ [{description, 0}, {intercept, 2}, {applies_to, 1}];
behaviour_info(_Other) ->
undefined.
@@ -51,15 +51,15 @@ behaviour_info(_Other) ->
%%----------------------------------------------------------------------------
-intercept_method(#'basic.publish'{} = M) ->
+intercept_method(#'basic.publish'{} = M, _VHost) ->
M;
-intercept_method(M) ->
- intercept_method(M, select(rabbit_misc:method_record_type(M))).
+intercept_method(M, VHost) ->
+ intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
-intercept_method(M, []) ->
+intercept_method(M, _VHost, []) ->
M;
-intercept_method(M, [I]) ->
- case I:intercept(M) of
+intercept_method(M, VHost, [I]) ->
+ case I:intercept(M, VHost) of
{ok, M2} ->
case validate_method(M, M2) of
true ->
@@ -74,7 +74,7 @@ intercept_method(M, [I]) ->
internal_error("Interceptor: ~p failed with reason: ~p",
[I, Reason])
end;
-intercept_method(M, Is) ->
+intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
[rabbit_misc:method_record_type(M), Is]).
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 640b282e..b8a2cc9c 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -34,7 +34,7 @@
publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index ab8c62fe..447cd893 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
- {ok, _RoutingRes, _DeliveredQPids} =
+ {ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index d59641b0..9421b52e 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -79,6 +79,25 @@ init_file(File, PrevHandler) ->
%% filter out "application: foo; exited: stopped; type: temporary"
handle_event({info_report, _, {_, std_info, _}}, State) ->
{ok, State};
+%% When a node restarts quickly it is possible the rest of the cluster
+%% will not have had the chance to remove its queues from
+%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes
+%% on_node_down(node()). But before we get there we can receive lots
+%% of messages intended for the old version of the node. The emulator
+%% logs an event for every one of those messages; in extremis this can
+%% bring the server to its knees just logging "Discarding..."
+%% again and again. So just log the first one, then go silent.
+handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
+ State) ->
+ case get(discarding_message_seen) of
+ true -> {ok, State};
+ undefined -> put(discarding_message_seen, true),
+ error_logger_file_h:handle_event(Event, State)
+ end;
+%% Clear this state if we log anything else (but not a progress report).
+handle_event(Event = {info_msg, _, _}, State) ->
+ erase(discarding_message_seen),
+ error_logger_file_h:handle_event(Event, State);
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index da728033..d37b356c 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,8 +17,7 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
-%% credit mechanism.
+%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -65,11 +64,9 @@
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
-%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
-%% about - and channel.flow blocking - that's what block/1,
-%% unblock/1 and is_blocked/1 are for. They also tell the limiter
-%% queue state (via the queue) about consumer credit changes -
-%% that's what credit/5 is for.
+%% get_prefetch_limit/1 API functions are about. They also tell the
+%% limiter queue state (via the queue) about consumer credit
+%% changes - that's what credit/5 is for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -83,12 +80,11 @@
%%
%% 5. Queues ask the limiter for permission (with can_send/3) whenever
%% they want to deliver a message to a channel. The limiter checks
-%% whether a) the channel isn't blocked by channel.flow, b) the
-%% volume has not yet reached the prefetch limit, and c) whether
-%% the consumer has enough credit. If so it increments the volume
-%% and tells the queue to proceed. Otherwise it marks the queue as
-%% requiring notification (see below) and tells the queue not to
-%% proceed.
+%% whether a) the volume has not yet reached the prefetch limit,
+%% and b) whether the consumer has enough credit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
%%
%% 6. A queue that has been told to proceed (by the return value of
%% can_send/3) sends the message to the channel. Conversely, a
@@ -123,8 +119,7 @@
-export([start_link/1]).
%% channel API
--export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
- is_prefetch_limited/1, is_blocked/1, is_active/1,
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
@@ -136,14 +131,13 @@
%%----------------------------------------------------------------------------
--record(lstate, {pid, prefetch_limited, blocked}).
+-record(lstate, {pid, prefetch_limited}).
-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
-type(lstate() :: #lstate{pid :: pid(),
- prefetch_limited :: boolean(),
- blocked :: boolean()}).
+ prefetch_limited :: boolean()}).
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
@@ -154,10 +148,6 @@
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
--spec(block/1 :: (lstate()) -> lstate()).
--spec(unblock/1 :: (lstate()) -> lstate()).
--spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
--spec(is_blocked/1 :: (lstate()) -> boolean()).
-spec(is_active/1 :: (lstate()) -> boolean()).
-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
@@ -183,7 +173,6 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -201,7 +190,7 @@ start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}, infinity),
- #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
+ #lstate{pid = Pid, prefetch_limited = false}.
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
ok = gen_server:call(
@@ -213,19 +202,7 @@ unlimit_prefetch(L) ->
ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
-block(L) ->
- ok = gen_server:call(L#lstate.pid, block, infinity),
- L#lstate{blocked = true}.
-
-unblock(L) ->
- ok = gen_server:call(L#lstate.pid, unblock, infinity),
- L#lstate{blocked = false}.
-
-is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-
-is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-
-is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+is_active(#lstate{prefetch_limited = Limited}) -> Limited.
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
get_prefetch_limit(L) ->
@@ -349,19 +326,10 @@ handle_call(unlimit_prefetch, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
volume = 0})};
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call(unblock, _From, State) ->
- {reply, ok, maybe_notify(State, State#lim{blocked = false})};
-
handle_call(get_prefetch_limit, _From,
State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({can_send, QPid, _AckRequired}, _From,
- State = #lim{blocked = true}) ->
- {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case prefetch_limit_reached(State) of
@@ -397,8 +365,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
- not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ case prefetch_limit_reached(OldState) andalso
+ not prefetch_limit_reached(NewState) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -406,8 +374,6 @@ maybe_notify(OldState, NewState) ->
prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-blocked(#lim{blocked = Blocked}) -> Blocked.
-
remember_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4f50e1a5..9ce5afcb 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 908e4783..bea7e0d0 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -57,7 +57,6 @@
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
--type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
-spec new() -> state().
@@ -68,8 +67,7 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- credit_args(), rabbit_framing:amqp_table(), boolean(),
- state()) -> state().
+ rabbit_framing:amqp_table(), boolean(), state()) -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -120,8 +118,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -129,34 +127,34 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case CreditArgs of
+ update_ch_record(case parse_credit_args(Args) of
none -> C1;
{Crd, Drain} -> credit_and_drain(
- C1, ConsumerTag, Crd, Drain, IsEmpty)
+ C1, CTag, Crd, Drain, IsEmpty)
end),
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
- args = OtherArgs},
+ args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
State#state{consumers =
- remove_consumer(ChPid, ConsumerTag, Consumers)}
+ remove_consumer(ChPid, CTag, Consumers)}
end.
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
@@ -215,14 +213,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
end.
deliver_to_consumer(FetchFun,
- #consumer{tag = ConsumerTag,
+ #consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in(AckTag, ChAckTags);
@@ -324,6 +322,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -399,9 +407,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
end,
priority_queue:in({ChPid, Consumer}, Priority, Queue).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 8553e36d..64debcab 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -285,8 +285,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
- ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
+ {error, Reason} -> stop(Reason, State)
+ end;
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
@@ -312,11 +315,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
- maybe_emit_stats(State),
- throw(connection_closed_abruptly);
+ stop(closed, State);
{error, Reason} ->
- maybe_emit_stats(State),
- throw({inet_error, Reason});
+ stop(Reason, State);
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
?MODULE, Deb, {Buf, BufLen, State});
@@ -327,6 +328,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
end
end.
+stop(closed, State) -> maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+stop(Reason, State) -> maybe_emit_stats(State),
+ throw({inet_error, Reason}).
+
handle_other({conserve_resources, Source, Conserve},
State = #v1{throttle = Throttle =
#throttle{alarmed_by = CR}}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index af2a6e86..2d6ff73b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1172,7 +1172,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -2410,8 +2410,8 @@ publish_and_confirm(Q, Payload, Count) ->
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
Delivery = #delivery{mandatory = false, sender = self(),
- message = Msg, msg_seq_no = Seq},
- {routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
+ confirm = true, message = Msg, msg_seq_no = Seq},
+ _QPids = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index d0dcaa71..b08a9a1c 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
ok;
trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
RKPrefix, RKSuffix, Extra) ->
- {ok, _, _} = rabbit_basic:publish(
- X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
+ {ok, _} = rabbit_basic:publish(
+ X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
ok.
msg_to_table(#basic_message{exchange_name = #resource{name = XName},