diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-01 16:35:05 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-01 16:35:05 +0000 |
commit | 2ae51cb5ef9251d63f8864ad606b6630141c64c3 (patch) | |
tree | ec484bde77e2c5f33ece957b93b3207769cd79fd | |
parent | 1df635925ca8623633b1f0a2256543b47ca14e1e (diff) | |
parent | dfe8f0c6afdbb8ae4f4eafb64aac3df7c0619d4b (diff) | |
download | rabbitmq-server-2ae51cb5ef9251d63f8864ad606b6630141c64c3.tar.gz |
Merge default.
-rw-r--r-- | src/rabbit_channel.erl | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d8f55085..d6c9a51c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -678,7 +679,7 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, requeue = Requeue}, _, State) -> - reject(DeliveryTag, Requeue, Multiple, State); + reject(DeliveryTag, Multiple, Requeue, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -879,7 +880,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> - reject(DeliveryTag, Requeue, false, State); + reject(DeliveryTag, false, Requeue, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1084,9 +1085,13 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), ack(TAL, State1), + lists:foreach( + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1094,8 +1099,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:append([L || {_, L} <- TNL]), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1259,14 +1266,26 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject(DeliveryTag, Multiple, Requeue, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> + reject(Requeue, Acked, State1#ch.limiter), + State1; + in_progress -> + State1#ch{uncommitted_nacks = + [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + end}. + +reject(Requeue, Acked, Limiter) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. + ok = notify_limiter(Limiter, Acked). ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> @@ -1305,7 +1324,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; |