diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-01-24 17:21:48 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-01-24 17:21:48 +0000 |
commit | 45dd22501ccfdda72ce089ade71c9442306088a3 (patch) | |
tree | 44e72ccad7d6c3bb93ce2a56b75eea8f102b9cbd | |
parent | 65efc39ab5e6669c4088e5d26d7885ba252dcf58 (diff) | |
download | rabbitmq-server-45dd22501ccfdda72ce089ade71c9442306088a3.tar.gz |
Executing basick.{nack,reject} transactionally.
-rw-r--r-- | src/rabbit_channel.erl | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f14b2973..275de0f4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -185,6 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -663,7 +664,7 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, requeue = Requeue}, _, State) -> - reject(DeliveryTag, Requeue, Multiple, State); + reject_tx(DeliveryTag, Multiple, Requeue, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -864,7 +865,9 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> - reject(DeliveryTag, Requeue, false, State); + reject_tx(DeliveryTag, false, Requeue, State); + + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1069,18 +1072,25 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> - State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), - {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + State2 = ack(TAL, State1), + State3 = lists:foldl( + fun ({Requeue, Acked}, S) -> reject(Requeue, Acked, S) end, + State2, TNL), + State4 = new_tx(State3), + {noreply, maybe_complete_tx(State4#ch{tx_status = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:map(fun ({_, L}) -> L end, TNL), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1266,14 +1276,24 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject_tx(DeliveryTag, Multiple, Requeue, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> reject(Requeue, Acked, State); + in_progress -> + State1#ch{uncommitted_nacks = + {Requeue, Acked} ++ State1#ch.uncommitted_nacks} + end}. + +reject(Requeue, Acked, State) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. + ok = notify_limiter(State#ch.limiter, Acked). ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> @@ -1312,7 +1332,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; |