diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-20 11:08:28 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-20 11:08:28 +0000 |
commit | 904c58e4fcce4938c6a8725f484e2b78813c6986 (patch) | |
tree | c7db7336a88d46ad5216ea5cee8a8b604701f9e1 /src/rabbit_channel.erl | |
parent | 964f43befe1a48e5256ee1bfb77bc6b7422e4c05 (diff) | |
parent | d8baacd25557f3bee697dd867bd3c32e1d1ba874 (diff) | |
download | rabbitmq-server-904c58e4fcce4938c6a8725f484e2b78813c6986.tar.gz |
stable to default
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 522 |
1 files changed, 284 insertions, 238 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a715b291..792a06c9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,10 +33,10 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - conn_name, limiter, tx_status, next_tag, unacked_message_q, - uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + conn_name, limiter, tx, next_tag, unacked_message_q, user, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -64,6 +64,12 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INCR_STATS(Incs, Measure, State), + case rabbit_event:stats_level(State, #ch.stats_timer) of + fine -> incr_stats(Incs, Measure); + _ -> ok + end). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -185,15 +191,13 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, conn_pid = ConnPid, conn_name = ConnName, limiter = Limiter, - tx_status = none, + tx = none, next_tag = 1, unacked_message_q = queue:new(), - uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -314,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); + handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), - noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). + Timeout = case C of [] -> hibernate; _ -> 0 end, + %% NB: don't call noreply/1 since we don't want to send confirms. + {noreply, ensure_stats_timer(State1), Timeout}. handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -327,8 +334,10 @@ handle_info(timeout, State) -> handle_info(emit_stats, State) -> emit_stats(State), - noreply([ensure_stats_timer], - rabbit_event:reset_stats_timer(State, #ch.stats_timer)); + State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), + %% NB: don't call noreply/1 since we don't want to kick off the + %% stats timer. + {noreply, send_confirms(State1), hibernate}; handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), @@ -336,9 +345,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -368,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> reply(Reply, [], NewState). - -reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate). - -reply(Reply, Mask, NewState, Timeout) -> - {reply, Reply, next_state(Mask, NewState), Timeout}. +reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. -noreply(NewState) -> noreply([], NewState). +noreply(NewState) -> {noreply, next_state(NewState), hibernate}. -noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate). - -noreply(Mask, NewState, Timeout) -> - {noreply, next_state(Mask, NewState), Timeout}. - --define(MASKED_CALL(Fun, Mask, State), - case lists:member(Fun, Mask) of - true -> State; - false -> Fun(State) - end). - -next_state(Mask, State) -> - State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State), - State2 = ?MASKED_CALL(send_confirms, Mask, State1), - State2. +next_state(State) -> ensure_stats_timer(send_confirms(State)). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). @@ -425,8 +419,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +-ifdef(use_specs). +-spec(precondition_failed/1 :: (string()) -> no_return()). +-endif. precondition_failed(Format) -> precondition_failed(Format, []). +-ifdef(use_specs). +-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()). +-endif. precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). @@ -443,15 +443,13 @@ check_resource_access(User, Resource, Perm) -> undefined -> []; Other -> Other end, - CacheTail = - case lists:member(V, Cache) of - true -> lists:delete(V, Cache); - false -> ok = rabbit_access_control:check_resource_access( - User, Resource, Perm), - lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) - end, - put(permission_cache, [V | CacheTail]), - ok. + case lists:member(V, Cache) of + true -> ok; + false -> ok = rabbit_access_control:check_resource_access( + User, Resource, Perm), + CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), + put(permission_cache, [V | CacheTail]) + end. clear_permission_cache() -> erase(permission_cache), @@ -530,16 +528,12 @@ check_not_default_exchange(_) -> %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% -%% One, quite reasonable, interpretation of the spec, taken by the -%% QPid M1 Java client, is that the exclusion of "amq." prefixed names +%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names %% only applies on actual creation, and not in the cases where the -%% entity already exists. This is how we use this function in the code -%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly -%% 0-9SP1, making it illegal to attempt to declare an exchange/queue -%% with an amq.* name when passive=false. So this will need -%% revisiting. +%% entity already exists or passive=true. %% -%% TODO: enforce other constraints on name. See AMQP JIRA 69. +%% NB: We deliberately do not enforce the other constraints on names +%% required by the spec. check_name(Kind, NameBin = <<"amq.", _/binary>>) -> rabbit_misc:protocol_error( access_refused, @@ -550,18 +544,16 @@ check_name(_Kind, NameBin) -> queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case sets:is_element(QPid, Blocking) of false -> State; - true -> Blocking1 = sets:del_element(QPid, Blocking), - case sets:size(Blocking1) of - 0 -> ok = send(#'channel.flow_ok'{active = false}, State); - _ -> ok - end, - State#ch{blocking = Blocking1} + true -> maybe_send_flow_ok( + State#ch{blocking = sets:del_element(QPid, Blocking)}) end. -record_confirm(undefined, _, State) -> - State; -record_confirm(MsgSeqNo, XName, State) -> - record_confirms([{MsgSeqNo, XName}], State). +maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> + case sets:size(Blocking) of + 0 -> ok = send(#'channel.flow_ok'{active = false}, State); + _ -> ok + end, + State. record_confirms([], State) -> State; @@ -597,6 +589,15 @@ handle_method(_Method, _, State = #ch{state = closing}) -> handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> {ok, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. ReaderPid ! {channel_closing, self()}, {noreply, State1}; @@ -604,8 +605,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> %% while waiting for the reply to a synchronous command, we generally %% do allow this...except in the case of a pending tx.commit, where %% it could wreak havoc. -handle_method(_Method, _, #ch{tx_status = TxStatus}) - when TxStatus =/= none andalso TxStatus =/= in_progress -> +handle_method(_Method, _, #ch{tx = Tx}) + when Tx =:= committing orelse Tx =:= failed -> rabbit_misc:protocol_error( channel_error, "unexpected command while processing 'tx.commit'", []); @@ -619,7 +620,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, - tx_status = TxStatus, + tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -633,23 +634,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Props, State), check_expiration_header(Props), {MsgSeqNo, State1} = - case {TxStatus, ConfirmEnabled} of + case {Tx, ConfirmEnabled} of {none, false} -> {undefined, State}; {_, _} -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_trace_in(Message, TraceState), + rabbit_trace:tap_in(Message, TraceState), Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - {noreply, - case TxStatus of - none -> deliver_to_queues({Delivery, QNames}, State1); - in_progress -> TMQ = State1#ch.uncommitted_message_q, - NewTMQ = queue:in({Delivery, QNames}, TMQ), - State1#ch{uncommitted_message_q = NewTMQ} - end}; + DQ = {Delivery, QNames}, + {noreply, case Tx of + none -> deliver_to_queues(DQ, State1); + {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + State1#ch{tx = {Msgs1, Acks}} + end}; {error, Reason} -> precondition_failed("invalid message: ~p", [Reason]) end; @@ -662,16 +662,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, - case TxStatus of - none -> ack(Acked, State1), - State1; - in_progress -> State1#ch{uncommitted_acks = - Acked ++ State1#ch.uncommitted_acks} - end}; + {noreply, case Tx of + none -> ack(Acked, State1), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -684,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -696,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -735,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -822,14 +822,12 @@ handle_method(#'basic.recover_async'{requeue = true}, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_misc:with_exit_handler( - OkFun, fun () -> - rabbit_amqqueue:requeue( - QPid, MsgIds, self()) - end) - end, ok, UAMQL), + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_misc:with_exit_handler( + OkFun, + fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) + end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method @@ -1044,34 +1042,34 @@ handle_method(#'queue.purge'{queue = QueueNameBin, handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> precondition_failed("cannot switch from confirm to tx mode"); +handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> + {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; + handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; + {reply, #'tx.select_ok'{}, State}; -handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); -handle_method(#'tx.commit'{}, _, - State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL, - limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), - ack(TAL, State1), - lists:foreach( - fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), - {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; - -handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, + limiter = Limiter}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + lists:foreach(fun ({ack, A}) -> ack(A, State1); + ({Requeue, A}) -> reject(Requeue, A, Limiter) + end, lists:reverse(Acks)), + {noreply, maybe_complete_tx(State1#ch{tx = committing})}; + +handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL}) -> - TNL1 = lists:append([L || {_, L} <- TNL]), - UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; + tx = {_Msgs, Acks}}) -> + AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), + UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), + {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, + tx = new_tx()}}; -handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> +handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) -> precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> @@ -1096,12 +1094,9 @@ handle_method(#'channel.flow'{active = false}, _, end, State1 = State#ch{limiter = Limiter1}, ok = rabbit_limiter:block(Limiter1), - case consumer_queues(Consumers) of - [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State2} - end; + QPids = consumer_queues(Consumers), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})}; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -1130,9 +1125,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1196,6 +1194,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); + {error, {binding_invalid, Fmt, Args}} -> + rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) @@ -1215,42 +1215,40 @@ basic_return(#basic_message{exchange_name = ExchangeName, Content). reject(DeliveryTag, Requeue, Multiple, - State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, - case TxStatus of - none -> - reject(Requeue, Acked, State1#ch.limiter), - State1; - in_progress -> - State1#ch{uncommitted_nacks = - [{Requeue, Acked} | State1#ch.uncommitted_nacks]} - end}. - + {noreply, case Tx of + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}. + +%% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, Acked), ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); false -> ok end, - rabbit_trace:tap_trace_out(Msg, TraceState), + rabbit_trace:tap_out(Msg, TraceState), UAMQ1 = case AckRequired of true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, UAMQ); @@ -1258,40 +1256,61 @@ record_sent(ConsumerTag, AckRequired, end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. +%% NB: returns acks in youngest-first order collect_acks(Q, 0, true) -> - {queue:to_list(Q), queue:new()}; + {lists:reverse(queue:to_list(Q)), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks([], queue:new(), Q, DeliveryTag, Multiple). + collect_acks([], [], Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)}; + {[UnackedMsg | ToAcc], + case PrefixAcc of + [] -> QTail; + _ -> queue:join( + queue:from_list(lists:reverse(PrefixAcc)), + QTail) + end}; Multiple -> collect_acks([UnackedMsg | ToAcc], PrefixAcc, QTail, DeliveryTag, Multiple); true -> - collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), + collect_acks(ToAcc, [UnackedMsg | PrefixAcc], QTail, DeliveryTag, Multiple) end; {empty, _} -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> - Incs = fold_per_queue( - fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] - end, [], Acked), - ok = notify_limiter(State#ch.limiter, Acked), - incr_stats(Incs, ack, State). - -new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = []}. +%% NB: Acked is in youngest-first order +ack(Acked, State = #ch{queue_names = QNames}) -> + foreach_per_queue( + fun (QPid, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + ?INCR_STATS(case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count}]; + error -> [] + end, ack, State) + end, Acked), + ok = notify_limiter(State#ch.limiter, Acked). + +%% {Msgs, Acks} +%% +%% Msgs is a queue. +%% +%% Acks looks s.t. like this: +%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...] +%% +%% Each element is a pair consisting of a tag and a list of +%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' +%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as +%% well as the list overall, are in "most-recent (generally youngest) +%% ack first" order. +new_tx() -> {queue:new(), []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1301,15 +1320,17 @@ notify_queues(State = #ch{consumer_mapping = Consumers, sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. -fold_per_queue(_F, Acc, []) -> - Acc; -fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case - F(QPid, [MsgId], Acc); -fold_per_queue(F, Acc, UAL) -> +foreach_per_queue(_F, []) -> + ok; +foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId]); +%% NB: UAL should be in youngest-first order; the tree values will +%% then be in oldest-first order +foreach_per_queue(F, UAL) -> T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons(QPid, MsgId, T) end, gb_trees:empty(), UAL), - rabbit_misc:gb_trees_fold(F, Acc, T). + rabbit_misc:gb_trees_foreach(F, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1332,65 +1353,93 @@ notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 + ({_, _, _}, Acc) -> Acc + 1 end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end end. +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + msg_seq_no = undefined, + mandatory = false}, + []}, State) -> %% optimisation + ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), + State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), - incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + %% The pmon:monitor_all/2 monitors all queues to which we + %% delivered. But we want to monitor even queues we didn't deliver + %% to, since we need their 'DOWN' messages to clean + %% queue_names. So we also need to monitor each QPid from + %% queues. But that only gets the masters (which is fine for + %% cleaning queue_names), so we need the union of both. + %% + %% ...and we need to add even non-delivered queues to queue_names + %% since alternative algorithms to update queue_names less + %% frequently would in fact be more expensive in the common case. + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {case dict:is_key(QPid, QNames0) of + true -> QNames0; + false -> dict:store(QPid, QName, QNames0) + end, pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), + ?INCR_STATS([{exchange_stats, XName, 1} | + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), - record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> - record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + record_confirms([{MsgSeqNo, XName}], State); process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed)}. + State#ch.unconfirmed)}; +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State, no_route), + ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), + case MsgSeqNo of + undefined -> State; + _ -> record_confirms([{MsgSeqNo, XName}], State) + end. send_nacks([], State) -> State; -send_nacks(_MXs, State = #ch{state = closing, - tx_status = none}) -> %% optimisation +send_nacks(_MXs, State = #ch{state = closing, + tx = none}) -> %% optimisation State; -send_nacks(MXs, State = #ch{tx_status = none}) -> +send_nacks(MXs, State = #ch{tx = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation - State#ch{tx_status = failed}; + State#ch{tx = failed}; send_nacks(_, State) -> - maybe_complete_tx(State#ch{tx_status = failed}). + maybe_complete_tx(State#ch{tx = failed}). -send_confirms(State = #ch{tx_status = none, confirmed = []}) -> +send_confirms(State = #ch{tx = none, confirmed = []}) -> State; -send_confirms(State = #ch{tx_status = none, confirmed = C}) -> +send_confirms(State = #ch{tx = none, confirmed = C}) -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> - incr_stats([{exchange_stats, XName, 1}], confirm, State), + ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), send_confirms(MsgSeqNos, State#ch{confirmed = []}); @@ -1424,7 +1473,12 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss], State. -maybe_complete_tx(State = #ch{tx_status = in_progress}) -> +ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; +ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks]. + +ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]). + +maybe_complete_tx(State = #ch{tx = {_, _}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> case dtree:is_empty(UC) of @@ -1432,16 +1486,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> true -> complete_tx(State#ch{confirmed = []}) end. -complete_tx(State = #ch{tx_status = committing}) -> +complete_tx(State = #ch{tx = committing}) -> ok = send(#'tx.commit_ok'{}, State), - State#ch{tx_status = in_progress}; -complete_tx(State = #ch{tx_status = failed}) -> + State#ch{tx = new_tx()}; +complete_tx(State = #ch{tx = failed}) -> {noreply, State1} = handle_exception( rabbit_misc:amqp_error( precondition_failed, "partial tx completion", [], 'tx.commit'), State), - State1#ch{tx_status = in_progress}. + State1#ch{tx = new_tx()}. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1450,19 +1504,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{tx_status = TE}) -> TE =/= none; +i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); -i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> - dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> - queue:len(UAMQ); -i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> - queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> - length(TAL); +i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); +i(messages_uncommitted, #ch{}) -> 0; +i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); +i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> @@ -1473,12 +1524,8 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -incr_stats(Incs, Measure, State) -> - case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> [update_measures(Type, Key, Inc, Measure) || - {Type, Key, Inc} <- Incs]; - _ -> ok - end. +incr_stats(Incs, Measure) -> + [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs]. update_measures(Type, Key, Inc, Measure) -> Measures = case get({Type, Key}) of @@ -1495,24 +1542,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. |