summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-01-24 17:21:48 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-01-24 17:21:48 +0000
commit45dd22501ccfdda72ce089ade71c9442306088a3 (patch)
tree44e72ccad7d6c3bb93ce2a56b75eea8f102b9cbd
parent65efc39ab5e6669c4088e5d26d7885ba252dcf58 (diff)
downloadrabbitmq-server-45dd22501ccfdda72ce089ade71c9442306088a3.tar.gz
Executing basick.{nack,reject} transactionally.
-rw-r--r--src/rabbit_channel.erl51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f14b2973..275de0f4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -185,6 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -663,7 +664,7 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
requeue = Requeue},
_, State) ->
- reject(DeliveryTag, Requeue, Multiple, State);
+ reject_tx(DeliveryTag, Multiple, Requeue, State);
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -864,7 +865,9 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
_, State) ->
- reject(DeliveryTag, Requeue, false, State);
+ reject_tx(DeliveryTag, false, Requeue, State);
+
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1069,18 +1072,25 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ State2 = ack(TAL, State1),
+ State3 = lists:foldl(
+ fun ({Requeue, Acked}, S) -> reject(Requeue, Acked, S) end,
+ State2, TNL),
+ State4 = new_tx(State3),
+ {noreply, maybe_complete_tx(State4#ch{tx_status = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:map(fun ({_, L}) -> L end, TNL),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1266,14 +1276,24 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject_tx(DeliveryTag, Multiple, Requeue,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none -> reject(Requeue, Acked, State);
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ {Requeue, Acked} ++ State1#ch.uncommitted_nacks}
+ end}.
+
+reject(Requeue, Acked, State) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
+ ok = notify_limiter(State#ch.limiter, Acked).
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
@@ -1312,7 +1332,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};