diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-30 15:58:41 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-30 15:58:41 +0000 |
commit | a51739f17601cea3e6e7760d146b51798a64bc94 (patch) | |
tree | f9a07853bbf7377d1ee4f382c3f5c3adbeca4534 | |
parent | 95bce23e626c5911ea7787733437f0d8b6ed42cd (diff) | |
parent | 1cb35e55a99a2733d8237168e7e67868ff8c9651 (diff) | |
download | rabbitmq-server-a51739f17601cea3e6e7760d146b51798a64bc94.tar.gz |
merge bug25827 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 8 | ||||
-rw-r--r-- | include/rabbit.hrl | 9 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | src/dtree.erl | 11 | ||||
-rw-r--r-- | src/gm.erl | 106 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 29 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 71 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 28 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 233 | ||||
-rw-r--r-- | src/rabbit_channel_interceptor.erl | 20 | ||||
-rw-r--r-- | src/rabbit_dead_letter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 19 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 66 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 46 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 6 |
21 files changed, 366 insertions, 366 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d2a3f7c7..d19acd00 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1559,14 +1559,6 @@ <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> - <varlistentry> - <term>client_flow_blocked</term> - <listitem><para>True if the client issued a - <command>channel.flow{active=false}</command> - command, blocking the server from delivering - messages to the channel's consumers. - </para></listitem> - </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index afb6e576..6d117e3d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,7 +70,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). @@ -112,4 +112,11 @@ -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). +%% Trying to send a term across a cluster larger than 2^31 bytes will +%% cause the VM to exit with "Absurdly large distribution output data +%% buffer". So we limit the max message size to 2^31 - 10^6 bytes (1MB +%% to allow plenty of leeway for the #basic_message{} and #content{} +%% wrapping the message body). +-define(MAX_MSG_SIZE, 2147383648). + -define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index f53eea95..a96ccb35 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -130,6 +130,9 @@ done rm -rf %{buildroot} %changelog +* Thu Jan 23 2014 emile@rabbitmq.com 3.2.3-1 +- New Upstream Release + * Tue Dec 10 2013 emile@rabbitmq.com 3.2.2-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index f1e1c66b..7138409c 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.2.3-1) unstable; urgency=low + + * New Upstream Release + + -- Emile Joubert <emile@rabbitmq.com> Thu, 23 Jan 2014 14:46:37 +0000 + rabbitmq-server (3.2.2-1) unstable; urgency=low * New Upstream Release diff --git a/src/dtree.erl b/src/dtree.erl index 5ff36bd9..72abe248 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,7 @@ -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(drop/2 :: (pk(), ?MODULE()) -> ?MODULE()). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -120,6 +121,14 @@ take_all(SK, {P, S}) -> {KVs, {P1, prune(SKS, PKS, S)}} end. +%% Drop all entries for the given primary key (which does not have to exist). +drop(PK, {P, S}) -> + case gb_trees:lookup(PK, P) of + none -> {P, S}; + {value, {SKS, _V}} -> {gb_trees:delete(PK, P), + prune(SKS, gb_sets:singleton(PK), S)} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -382,7 +382,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -395,6 +395,7 @@ -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). +-define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). @@ -414,6 +415,7 @@ callback_args, confirms, broadcast_buffer, + broadcast_buffer_sz, broadcast_timer, txn_executor }). @@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) -> leave(Server) -> gen_server2:cast(Server, leave). -broadcast(Server, Msg) -> - gen_server2:cast(Server, {broadcast, Msg}). +broadcast(Server, Msg) -> broadcast(Server, Msg, 0). + +broadcast(Server, Msg, SizeHint) -> + gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). @@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) -> random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = -1, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_buffer_sz = 0, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, State); + internal_broadcast(Msg, From, 0, State); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> noreply(State); -handle_cast({broadcast, Msg}, +handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, @@ -650,8 +656,8 @@ handle_cast({broadcast, Msg}, handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); -handle_cast({broadcast, Msg}, State) -> - internal_broadcast(Msg, none, State); +handle_cast({broadcast, Msg, SizeHint}, State) -> + internal_broadcast(Msg, none, SizeHint, State); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - module = Module, - confirms = Confirms, - callback_args = Args, - broadcast_buffer = Buffer }) -> +internal_broadcast(Msg, From, SizeHint, + State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer, + broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount1, Msg} | Buffer], @@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self, none -> Confirms; _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1 }, - handle_callback_result({Result, case From of - none -> State1; - _ -> flush_broadcast_buffer(State1) - end}). + State1 = State #state { pub_count = PubCount1, + confirms = Confirms1, + broadcast_buffer = Buffer1, + broadcast_buffer_sz = BufferSize + SizeHint}, + handle_callback_result( + {Result, case From of + none -> maybe_flush_broadcast_buffer(State1); + _ -> flush_broadcast_buffer(State1) + end}). + +%% The Erlang distribution mechanism has an interesting quirk - it +%% will kill the VM cold with "Absurdly large distribution output data +%% buffer" if you attempt to send a message which serialises out to +%% more than 2^31 bytes in size. It's therefore a very good idea to +%% make sure that we don't exceed that size! +%% +%% Now, we could figure out the size of messages as they come in using +%% size(term_to_binary(Msg)) or similar. The trouble is, that requires +%% us to serialise the message only to throw the serialised form +%% away. Hard to believe that's a sensible thing to do. So instead we +%% accept a size hint from the application, via broadcast/3. This size +%% hint can be the size of anything in the message which we expect +%% could be large, and we just ignore the size of any small bits of +%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat +%% conservatively at 100MB - but the buffer is only to allow us to +%% buffer tiny messages anyway, so 100MB is plenty. + +maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> + case Size > ?MAX_BUFFER_SIZE of + true -> flush_broadcast_buffer(State); + false -> State + end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; @@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self, Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [] }. + State #state { members_state = MembersState1, + broadcast_buffer = [], + broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d54c2a8d..19171659 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), lists:foldl( - fun(Module, {refused, _, _}) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> - {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> - Else + fun ({ModN, ModZ}, {refused, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ + %% module, and use the #user{} the latter gives us. + case try_login(ModN, Username, AuthProps) of + {ok, _} -> try_login(ModZ, Username, []); + Else -> Else end; - (_, {ok, User}) -> + (Mod, {refused, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + try_login(Mod, Username, AuthProps); + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... {ok, User} end, {refused, "No modules checked '~s'", [Username]}, Modules). +try_login(Module, Username, AuthProps) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> Else + end. + check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 55b98a0c..eeb0e0bf 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,8 +26,8 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). --export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). @@ -51,7 +51,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0, routing_result/0]). +-export_type([name/0, qmsg/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -61,7 +61,6 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_absent() :: rabbit_types:amqqueue() | {'absent', rabbit_types:amqqueue()}). -type(not_found_or_absent() :: 'not_found' | @@ -113,10 +112,9 @@ -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(consumers/1 :: - (rabbit_types:amqqueue()) - -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). +-spec(consumers/1 :: (rabbit_types:amqqueue()) + -> [{pid(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) @@ -139,9 +137,9 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -151,9 +149,9 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/10 :: +-spec(basic_consume/9 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) + rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -161,7 +159,6 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -433,7 +430,7 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_max_length_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. @@ -443,7 +440,7 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. -check_max_length_arg({Type, Val}, Args) -> +check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; ok -> {error, {value_negative, Val}}; @@ -556,8 +553,8 @@ requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). -reject(QPid, MsgIds, Requeue, ChPid) -> - delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). +reject(QPid, Requeue, MsgIds, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), @@ -578,13 +575,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). -basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, - LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> - ok = check_consume_arguments(QName, OtherArgs), +basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, - OkMsg}). + ConsumerTag, ExclusiveConsume, Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -611,8 +606,6 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). - internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia @@ -710,17 +703,11 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -deliver([], #delivery{mandatory = false}, _Flow) -> +deliver([], _Delivery, _Flow) -> %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> - %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver - %% will deliver the message to the queue process asynchronously, - %% and return true, which means all the QPids will always be - %% returned. It is therefore safe to use a fire-and-forget cast - %% here and return the QPids - the semantics is preserved. This - %% scales much better than the case below. + []; + +deliver(Qs, Delivery, Flow) -> {MPids, SPids} = qpids(Qs), QPids = MPids ++ SPids, case Flow of @@ -737,19 +724,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> SMsg = {deliver, Delivery, true, Flow}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), - {routed, QPids}; - -deliver(Qs, Delivery, _Flow) -> - {MPids, SPids} = qpids(Qs), - %% see comment above - MMsg = {deliver, Delivery, false}, - SMsg = {deliver, Delivery, true}, - {MRouted, _} = delegate:call(MPids, MMsg), - {SRouted, _} = delegate:call(SPids, SMsg), - case MRouted ++ SRouted of - [] -> {unroutable, []}; - R -> {routed, [QPid || {QPid, ok} <- R]} - end. + QPids. qpids([]) -> {[], []}; %% optimisation qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 51032fc7..da8c0607 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -429,9 +429,10 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> +send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirm = true, + sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, @@ -440,11 +441,19 @@ send_or_record_confirm(#delivery{sender = SenderPid, msg_id_to_channel = MTC}) -> MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; -send_or_record_confirm(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{confirm = true, + sender = SenderPid, msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + discard(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message{id = MsgId}}, State) -> @@ -500,6 +509,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), @@ -889,11 +899,6 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State = #q{consumers = Consumers}) -> reply(rabbit_queue_consumers:all(Consumers), State); -handle_call({deliver, Delivery, Delivered}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, Delivered, State)); - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -924,7 +929,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, + ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -932,8 +937,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - CreditArgs, OtherArgs, - is_empty(State), Consumers), + Args, is_empty(State), Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder @@ -943,7 +947,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), + not NoAck, qname(State1), Args), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1046,7 +1050,6 @@ handle_cast({run_backing_queue, Mod, Fun}, handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); @@ -1058,10 +1061,10 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, handle_cast({ack, AckTags, ChPid}, State) -> noreply(ack(AckTags, ChPid, State)); -handle_cast({reject, AckTags, true, ChPid}, State) -> +handle_cast({reject, true, AckTags, ChPid}, State) -> noreply(requeue(AckTags, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State) -> +handle_cast({reject, false, AckTags, ChPid}, State) -> noreply(with_dlx( State#q.dlx, fun (X) -> subtract_acks(ChPid, AckTags, State, @@ -1086,10 +1089,6 @@ handle_cast({activate_limit, ChPid}, State) -> noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), ChPid, State)); -handle_cast({flush, ChPid}, State) -> - ok = rabbit_channel:flushed(ChPid, self()), - noreply(State); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536..a5dc6eb2 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,9 +20,9 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/3, header_routes/1, + extract_headers/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1]). --export([build_content/2, from_content/1]). +-export([build_content/2, from_content/1, msg_size/1]). %%---------------------------------------------------------------------------- @@ -31,8 +31,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_amqqueue:routing_result(), [pid()]} - | rabbit_types:error('not_found'))). + ({ok, [pid()]} | rabbit_types:error('not_found'))). -type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -46,8 +45,8 @@ properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/3 :: - (boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -77,6 +76,9 @@ (rabbit_framing:amqp_property_record()) -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). +-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) -> + non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- @@ -90,10 +92,10 @@ publish(Exchange, RoutingKeyBin, Properties, Body) -> %% erlang distributed network. publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(X, delivery(Mandatory, Message, undefined)); + publish(X, delivery(Mandatory, false, Message, undefined)); publish(XName, RKey, Mandatory, Props, Body) -> Message = message(XName, RKey, properties(Props), Body), - publish(delivery(Mandatory, Message, undefined)). + publish(delivery(Mandatory, false, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -104,11 +106,11 @@ publish(Delivery = #delivery{ publish(X, Delivery) -> Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), - {ok, RoutingRes, DeliveredQPids}. + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), + {ok, DeliveredQPids}. -delivery(Mandatory, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, sender = self(), +delivery(Mandatory, Confirm, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> @@ -274,3 +276,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> {error, {leftover_string, S}} end. +msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR); +msg_size(#basic_message{content = Content}) -> msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 469cf4f7..4d866908 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,8 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, - flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -37,9 +36,9 @@ conn_name, limiter, tx, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, queue_names, queue_monitors, consumer_mapping, - blocking, queue_consumers, delivering_queues, + queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -53,7 +52,6 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked, state]). -define(CREATION_EVENT_KEYS, @@ -99,7 +97,6 @@ -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). --spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -149,9 +146,6 @@ send_credit_reply(Pid, Len) -> send_drained(Pid, CTagCredit) -> gen_server2:cast(Pid, {send_drained, CTagCredit}). -flushed(Pid, QPid) -> - gen_server2:cast(Pid, {flushed, QPid}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). @@ -213,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), - blocking = sets:new(), queue_consumers = dict:new(), delivering_queues = sets:new(), queue_collector_pid = CollectorPid, @@ -221,6 +214,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, publish_seqno = 1, unconfirmed = dtree:empty(), confirmed = [], + mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), @@ -239,8 +233,9 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - _ -> 0 + {confirm, _MsgSeqNos, _QPid} -> 5; + {mandatory_received, _MsgSeqNo, _QPid} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> @@ -268,13 +263,14 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content, Flow}, - State = #ch{reader_pid = Reader}) -> + State = #ch{reader_pid = Reader, + virtual_host = VHost}) -> case Flow of flow -> credit_flow:ack(Reader); noflow -> ok end, try handle_method(rabbit_channel_interceptor:intercept_method( - expand_shortcuts(Method, State)), + expand_shortcuts(Method, State), VHost), Content, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), @@ -291,9 +287,6 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast({flushed, QPid}, State) -> - {noreply, queue_blocked(QPid, State), hibernate}; - handle_cast(ready_for_close, State = #ch{state = closing, writer_pid = WriterPid}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), @@ -346,11 +339,14 @@ handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); -handle_cast({confirm, MsgSeqNos, From}, State) -> - State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), - Timeout = case C of [] -> hibernate; _ -> 0 end, +handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. - {noreply, ensure_stats_timer(State1), Timeout}. + noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); + +handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -368,8 +364,7 @@ handle_info(emit_stats, State) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), - State2 = queue_blocked(QPid, State1), - State3 = handle_consuming_queue_down(QPid, State2), + State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, @@ -416,6 +411,10 @@ noreply(NewState) -> {noreply, next_state(NewState), hibernate}. next_state(State) -> ensure_stats_timer(send_confirms(State)). +noreply_coalesce(State = #ch{confirmed = C}) -> + Timeout = case C of [] -> hibernate; _ -> 0 end, + {noreply, ensure_stats_timer(State), Timeout}. + ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). @@ -480,9 +479,8 @@ check_resource_access(User, Resource, Perm) -> put(permission_cache, [V | CacheTail]) end. -clear_permission_cache() -> - erase(permission_cache), - ok. +clear_permission_cache() -> erase(permission_cache), + ok. check_configure_permitted(Resource, #ch{user = User}) -> check_resource_access(User, Resource, configure). @@ -522,6 +520,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +check_msg_size(Content) -> + Size = rabbit_basic:msg_size(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~B larger than max size ~B", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + qbin_to_resource(QueueNameBin, State) -> name_to_resource(queue, QueueNameBin, State). @@ -529,8 +535,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, Type, NameBin). expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> @@ -538,8 +543,7 @@ expand_queue_name_shortcut(QueueNameBin, _) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; @@ -593,31 +597,11 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. -queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case sets:is_element(QPid, Blocking) of - false -> State; - true -> maybe_send_flow_ok( - State#ch{blocking = sets:del_element(QPid, Blocking)}) - end. - -maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> - case sets:size(Blocking) of - 0 -> ok = send(#'channel.flow_ok'{active = false}, State); - _ -> ok - end, - State. - record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> State#ch{confirmed = [MXs | C]}. -confirm([], _QPid, State) -> - State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}). - handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? State1 = State#ch{state = running}, @@ -679,6 +663,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> + check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -689,16 +674,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(Props, State), check_expiration_header(Props), + DoConfirm = Tx =/= none orelse ConfirmEnabled, {MsgSeqNo, State1} = - case {Tx, ConfirmEnabled} of - {none, false} -> {undefined, State}; - {_, _} -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case DoConfirm orelse Mandatory of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), + Delivery = rabbit_basic:delivery( + Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), DQ = {Delivery, QNames}, {noreply, case Tx of @@ -785,13 +772,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), - ActualConsumerTag, ExclusiveConsume, - CreditArgs, OtherArgs, + ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -875,8 +860,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, queue:len(UAMQ)), - {reply, #'basic.qos_ok'{}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; + case ((not rabbit_limiter:is_active(Limiter)) andalso + rabbit_limiter:is_active(Limiter1)) of + true -> rabbit_amqqueue:activate_limit_all( + consumer_queues(State#ch.consumer_mapping), self()); + false -> ok + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1165,36 +1155,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, - _, State = #ch{limiter = Limiter}) -> - Limiter1 = rabbit_limiter:unblock(Limiter), - {reply, #'channel.flow_ok'{active = true}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; - -handle_method(#'channel.flow'{active = false}, - _, State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> - case rabbit_limiter:is_blocked(Limiter) of - true -> {noreply, maybe_send_flow_ok(State)}; - false -> Limiter1 = rabbit_limiter:block(Limiter), - State1 = maybe_limit_queues(Limiter, Limiter1, - State#ch{limiter = Limiter1}), - %% The semantics of channel.flow{active=false} - %% require that no messages are delivered after the - %% channel.flow_ok has been sent. We accomplish that - %% by "flushing" all messages in flight from the - %% consumer queues to us. To do this we tell all the - %% queues to invoke rabbit_channel:flushed/2, which - %% will send us a {flushed, ...} message that appears - %% *after* all the {deliver, ...} messages. We keep - %% track of all the QPids thus asked, and once all of - %% them have responded (or died) we send the - %% channel.flow_ok. - QPids = consumer_queues(Consumers), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, maybe_send_flow_ok( - State1#ch{blocking = sets:from_list(QPids)})} - end; +handle_method(#'channel.flow'{active = true}, _, State) -> + {reply, #'channel.flow_ok'{active = true}, State}; + +handle_method(#'channel.flow'{active = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "active=false", []); handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, @@ -1245,12 +1210,17 @@ monitor_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QPid, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, + mandatory = Mand}) -> + {MMsgs, Mand1} = dtree:take(QPid, Mand), + [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], + State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - send_nacks(MXs, State#ch{unconfirmed = UC1}); + send_nacks(MXs, State1#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}) + record_confirms(MXs, State1#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, @@ -1279,16 +1249,6 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -parse_credit_args(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none - end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; - undefined -> {none, Arguments} - end. - binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1328,7 +1288,9 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + State = #ch{protocol = Protocol, writer_pid = WriterPid}, + Reason) -> + ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1353,7 +1315,7 @@ reject(DeliveryTag, Requeue, Multiple, reject(Requeue, Acked, Limiter) -> foreach_per_queue( fun (QPid, MsgIds) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1456,15 +1418,6 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -maybe_limit_queues(OldLimiter, NewLimiter, State) -> - case ((not rabbit_limiter:is_active(OldLimiter)) andalso - rabbit_limiter:is_active(NewLimiter)) of - true -> Queues = consumer_queues(State#ch.consumer_mapping), - rabbit_amqqueue:activate_limit_all(Queues, self()); - false -> ok - end, - State. - consumer_queues(Consumers) -> lists:usort([QPid || {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). @@ -1476,7 +1429,7 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> %% optimisation: avoid the potentially expensive 'foldl' in the %% common case. - case rabbit_limiter:is_prefetch_limited(Limiter) of + case rabbit_limiter:is_active(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1487,18 +1440,19 @@ notify_limiter(Limiter, Acked) -> end. deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = undefined, mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, + mandatory = Mandatory, + confirm = Confirm, msg_seq_no = MsgSeqNo}, DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1517,31 +1471,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, - State#ch{queue_names = QNames1, - queue_monitors = QMons1}), + State1 = State#ch{queue_names = QNames1, + queue_monitors = QMons1}, + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo, + Message, State1), + State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, + XName, State2), ?INCR_STATS([{exchange_stats, XName, 1} | [{queue_exchange_stats, {QName, XName}, 1} || QPid <- DeliveredQPids, {ok, QName} <- [dict:find(QPid, QNames1)]]], - publish, State1), - State1. + publish, State3), + State3. + +process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> + State; +process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State, no_route), + State; +process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, + State#ch.mandatory)}. -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> State; -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed)}; -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_route), - ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), - case MsgSeqNo of - undefined -> State; - _ -> record_confirms([{MsgSeqNo, XName}], State) - end. + State#ch.unconfirmed)}. send_nacks([], State) -> State; @@ -1644,8 +1604,6 @@ i(state, #ch{state = running}) -> credit_flow:state(); i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); -i(client_flow_blocked, #ch{limiter = Limiter}) -> - rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). @@ -1666,8 +1624,7 @@ update_measures(Type, Key, Inc, Measure) -> end, put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)). -emit_stats(State) -> - emit_stats(State, []). +emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> Coarse = infos(?STATISTICS_KEYS, State), diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 5d1665e0..2bd22579 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -22,7 +22,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([intercept_method/1]). +-export([intercept_method/2]). -ifdef(use_specs). @@ -32,7 +32,7 @@ -callback description() -> [proplists:property()]. --callback intercept(original_method()) -> +-callback intercept(original_method(), rabbit_types:vhost()) -> rabbit_types:ok_or_error2(processed_method(), any()). %% Whether the interceptor wishes to intercept the amqp method @@ -43,7 +43,7 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {intercept, 1}, {applies_to, 1}]; + [{description, 0}, {intercept, 2}, {applies_to, 1}]; behaviour_info(_Other) -> undefined. @@ -51,15 +51,15 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- -intercept_method(#'basic.publish'{} = M) -> +intercept_method(#'basic.publish'{} = M, _VHost) -> M; -intercept_method(M) -> - intercept_method(M, select(rabbit_misc:method_record_type(M))). +intercept_method(M, VHost) -> + intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))). -intercept_method(M, []) -> +intercept_method(M, _VHost, []) -> M; -intercept_method(M, [I]) -> - case I:intercept(M) of +intercept_method(M, VHost, [I]) -> + case I:intercept(M, VHost) of {ok, M2} -> case validate_method(M, M2) of true -> @@ -74,7 +74,7 @@ intercept_method(M, [I]) -> internal_error("Interceptor: ~p failed with reason: ~p", [I, Reason]) end; -intercept_method(M, Is) -> +intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", [rabbit_misc:method_record_type(M), Is]). diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 640b282e..b8a2cc9c 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -34,7 +34,7 @@ publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), {Queues, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index ab8c62fe..447cd893 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, - {ok, _RoutingRes, _DeliveredQPids} = + {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index d59641b0..9421b52e 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -79,6 +79,25 @@ init_file(File, PrevHandler) -> %% filter out "application: foo; exited: stopped; type: temporary" handle_event({info_report, _, {_, std_info, _}}, State) -> {ok, State}; +%% When a node restarts quickly it is possible the rest of the cluster +%% will not have had the chance to remove its queues from +%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes +%% on_node_down(node()). But before we get there we can receive lots +%% of messages intended for the old version of the node. The emulator +%% logs an event for every one of those messages; in extremis this can +%% bring the server to its knees just logging "Discarding..." +%% again and again. So just log the first one, then go silent. +handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, + State) -> + case get(discarding_message_seen) of + true -> {ok, State}; + undefined -> put(discarding_message_seen, true), + error_logger_file_h:handle_event(Event, State) + end; +%% Clear this state if we log anything else (but not a progress report). +handle_event(Event = {info_msg, _, _}, State) -> + erase(discarding_message_seen), + error_logger_file_h:handle_event(Event, State); handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index da728033..d37b356c 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,8 +17,7 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) -%% credit mechanism. +%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -65,11 +64,9 @@ %% %% 1. Channels tell the limiter about basic.qos prefetch counts - %% that's what the limit_prefetch/3, unlimit_prefetch/1, -%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are -%% about - and channel.flow blocking - that's what block/1, -%% unblock/1 and is_blocked/1 are for. They also tell the limiter -%% queue state (via the queue) about consumer credit changes - -%% that's what credit/5 is for. +%% get_prefetch_limit/1 API functions are about. They also tell the +%% limiter queue state (via the queue) about consumer credit +%% changes - that's what credit/5 is for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -83,12 +80,11 @@ %% %% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks -%% whether a) the channel isn't blocked by channel.flow, b) the -%% volume has not yet reached the prefetch limit, and c) whether -%% the consumer has enough credit. If so it increments the volume -%% and tells the queue to proceed. Otherwise it marks the queue as -%% requiring notification (see below) and tells the queue not to -%% proceed. +%% whether a) the volume has not yet reached the prefetch limit, +%% and b) whether the consumer has enough credit. If so it +%% increments the volume and tells the queue to proceed. Otherwise +%% it marks the queue as requiring notification (see below) and +%% tells the queue not to proceed. %% %% 6. A queue that has been told to proceed (by the return value of %% can_send/3) sends the message to the channel. Conversely, a @@ -123,8 +119,7 @@ -export([start_link/1]). %% channel API --export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, - is_prefetch_limited/1, is_blocked/1, is_active/1, +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, @@ -136,14 +131,13 @@ %%---------------------------------------------------------------------------- --record(lstate, {pid, prefetch_limited, blocked}). +-record(lstate, {pid, prefetch_limited}). -record(qstate, {pid, state, credits}). -ifdef(use_specs). -type(lstate() :: #lstate{pid :: pid(), - prefetch_limited :: boolean(), - blocked :: boolean()}). + prefetch_limited :: boolean()}). -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). @@ -154,10 +148,6 @@ -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) -> lstate()). -spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). --spec(block/1 :: (lstate()) -> lstate()). --spec(unblock/1 :: (lstate()) -> lstate()). --spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). --spec(is_blocked/1 :: (lstate()) -> boolean()). -spec(is_active/1 :: (lstate()) -> boolean()). -spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). -spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). @@ -183,7 +173,6 @@ -record(lim, {prefetch_count = 0, ch_pid, - blocked = false, queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -201,7 +190,7 @@ start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. ok = gen_server:call(Pid, {new, self()}, infinity), - #lstate{pid = Pid, prefetch_limited = false, blocked = false}. + #lstate{pid = Pid, prefetch_limited = false}. limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> ok = gen_server:call( @@ -213,19 +202,7 @@ unlimit_prefetch(L) -> ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity), L#lstate{prefetch_limited = false}. -block(L) -> - ok = gen_server:call(L#lstate.pid, block, infinity), - L#lstate{blocked = true}. - -unblock(L) -> - ok = gen_server:call(L#lstate.pid, unblock, infinity), - L#lstate{blocked = false}. - -is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. - -is_blocked(#lstate{blocked = Blocked}) -> Blocked. - -is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). +is_active(#lstate{prefetch_limited = Limited}) -> Limited. get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; get_prefetch_limit(L) -> @@ -349,19 +326,10 @@ handle_call(unlimit_prefetch, _From, State) -> {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, volume = 0})}; -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call(unblock, _From, State) -> - {reply, ok, maybe_notify(State, State#lim{blocked = false})}; - handle_call(get_prefetch_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({can_send, QPid, _AckRequired}, _From, - State = #lim{blocked = true}) -> - {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case prefetch_limit_reached(State) of @@ -397,8 +365,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso - not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + case prefetch_limit_reached(OldState) andalso + not prefetch_limit_reached(NewState) of true -> notify_queues(NewState); false -> NewState end. @@ -406,8 +374,6 @@ maybe_notify(OldState, NewState) -> prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -blocked(#lim{blocked = Blocked}) -> Blocked. - remember_queue(QPid, State = #lim{queues = Queues}) -> case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4f50e1a5..9ce5afcb 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 908e4783..bea7e0d0 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -57,7 +57,6 @@ -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). --type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. -spec new() -> state(). @@ -68,8 +67,7 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - credit_args(), rabbit_framing:amqp_table(), boolean(), - state()) -> state(). + rabbit_framing:amqp_table(), boolean(), state()) -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -120,8 +118,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -129,34 +127,34 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case CreditArgs of + update_ch_record(case parse_credit_args(Args) of none -> C1; {Crd, Drain} -> credit_and_drain( - C1, ConsumerTag, Crd, Drain, IsEmpty) + C1, CTag, Crd, Drain, IsEmpty) end), - Consumer = #consumer{tag = ConsumerTag, + Consumer = #consumer{tag = CTag, ack_required = not NoAck, - args = OtherArgs}, + args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. -remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> +remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Blocked1 = remove_consumer(ChPid, CTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag), update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), State#state{consumers = - remove_consumer(ChPid, ConsumerTag, Consumers)} + remove_consumer(ChPid, CTag, Consumers)} end. erase_ch(ChPid, State = #state{consumers = Consumers}) -> @@ -215,14 +213,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> end. deliver_to_consumer(FetchFun, - #consumer{tag = ConsumerTag, + #consumer{tag = CTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, QName) -> {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of true -> queue:in(AckTag, ChAckTags); @@ -324,6 +322,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -399,9 +407,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> end, priority_queue:in({ChPid, Consumer}, Priority, Queue). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) +remove_consumer(ChPid, CTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= CTag) end, Queue). remove_consumers(ChPid, Queue) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8553e36d..64debcab 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -285,8 +285,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> throw({become, F(Deb, Buf, BufLen, State)}); recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> - ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); + case rabbit_net:setopts(Sock, [{active, once}]) of + ok -> mainloop(Deb, Buf, BufLen, + State#v1{pending_recv = true}); + {error, Reason} -> stop(Reason, State) + end; recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), recvloop(Deb, [Rest], size(Rest), State1); @@ -312,11 +315,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> closed when State#v1.connection_state =:= closed -> ok; closed -> - maybe_emit_stats(State), - throw(connection_closed_abruptly); + stop(closed, State); {error, Reason} -> - maybe_emit_stats(State), - throw({inet_error, Reason}); + stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, ?MODULE, Deb, {Buf, BufLen, State}); @@ -327,6 +328,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. +stop(closed, State) -> maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> maybe_emit_stats(State), + throw({inet_error, Reason}). + handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index af2a6e86..2d6ff73b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1172,7 +1172,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -2410,8 +2410,8 @@ publish_and_confirm(Q, Payload, Count) -> <<>>, #'P_basic'{delivery_mode = 2}, Payload), Delivery = #delivery{mandatory = false, sender = self(), - message = Msg, msg_seq_no = Seq}, - {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) + confirm = true, message = Msg, msg_seq_no = Seq}, + _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index d0dcaa71..b08a9a1c 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, ok; trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - {ok, _, _} = rabbit_basic:publish( - X, <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), + {ok, _} = rabbit_basic:publish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, |