diff options
authorMatthias Radestock <>2011-07-06 17:36:22 +0100
committerMatthias Radestock <>2011-07-06 17:36:22 +0100
commit8f8faabe84201fb68c4644b37ac7de498f3f0d39 (patch)
parenta6f2674e3fb810bfd8a52d388dde5fa95cdfbb20 (diff)
make 'tx.commit-ok' indicate responsibility transfer
This is now very close to the previous tx semantics. The downsides w.r.t. the previous state of this branch are - a bit more code, though only in the channel - tx and confirm mode no longer compose - tx always carries the cost of confirms Implementation notes: - The channel must remain active while a commit waits for confirms from queues to trickle in. We achieve this by recording the fact that we have a pending commit. - The trigger for sending the commit-ok is that the number of pending confirms drops to zero and we have a pending commit. - We check for that condition in three places: a) at the point of commit (in case the tx contains no publishes or all confirmations happen as part of delivering the messages to queues) b) where we would normally send basic.acks c) where we would normally send basic.nacks - we are re-using the same logic/state as for 'proper' confirms, except we suppress the sending of acks/nacks - handling the failure case is slightly awkward. We record a tx as 'failed' as soon as we encounter the 'nack' case. Subsequently, when the aforementioned triggering condition is met instead of sending a tx.commit-ok we send a precondition_failed error. But we can't just employ rabbit_misc:protocol_error here since that only works in the context of handling an AMQP method, which is only the case for (a). So instead we drop to a slightly lower API level and rely on the fact precondition_failed is a channel-level, rather than connection-level error.
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 81a0ee80..0c211b46 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,7 +30,7 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, tx_enabled, next_tag,
+ limiter_pid, start_limiter_fun, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
@@ -174,7 +174,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
- tx_enabled = false,
+ tx_status = none,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
@@ -516,10 +516,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
-blind_confirm({#delivery{message = #basic_message{exchange_name = XName},
- msg_seq_no = MsgSeqNo}, _QNames}, State) ->
- record_confirm(MsgSeqNo, XName, State).
record_confirm(undefined, _, State) ->
record_confirm(MsgSeqNo, XName, State) ->
@@ -598,6 +594,15 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
ReaderPid ! {channel_closing, self()},
{noreply, State1};
+%% Even though the spec prohibits the client from sending commands
+%% while waiting for the reply to a synchronous command, we generally
+%% do allow this...except in the case of a pending tx.commit, where
+%% it could wreak havoc.
+handle_method(_Method, _, #ch{tx_status = TxStatus})
+ when TxStatus =/= none andalso TxStatus =/= in_progress ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected command while processing 'tx.commit'", []);
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -606,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
- tx_enabled = TxEnabled,
+ tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -618,10 +623,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(, State),
{MsgSeqNo, State1} =
- case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case {TxStatus, ConfirmEnabled} of
+ {none, false} -> {undefined, State};
+ {_, _} -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
@@ -629,13 +634,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
QNames = rabbit_exchange:route(Exchange, Delivery),
- State2 = case TxEnabled of
- true -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ};
- false -> deliver_to_queues({Delivery, QNames}, State1)
- end,
- {noreply, State2};
+ {noreply,
+ case TxStatus of
+ none -> deliver_to_queues({Delivery, QNames}, State1);
+ in_progress -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ}
+ end};
{error, Reason} ->
"invalid message: ~p", [Reason])
@@ -650,15 +655,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{unacked_message_q = UAMQ,
- tx_enabled = TxEnabled}) ->
+ tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply, case TxEnabled of
- true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
- Acked),
- State1#ch{uncommitted_ack_q = NewTAQ};
- false -> ack(Acked, State1)
- end};
+ {noreply,
+ case TxStatus of
+ none -> ack(Acked, State1);
+ in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -1039,28 +1044,35 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+handle_method(#''{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from confirm to tx mode", []);
handle_method(#''{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}};
+ {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
-handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_ack_q = TAQ}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))};
+ State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
+ {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_message_q = TMQ,
uncommitted_ack_q = TAQ}) ->
- State1 = rabbit_misc:queue_fold(fun blind_confirm/2, State, TMQ),
- {reply, #'tx.rollback_ok'{}, new_tx(State1#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
+handle_method(#''{}, _, #ch{tx_status = in_progress}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from tx to confirm mode", []);
handle_method(#''{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1126,7 +1138,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} = case Reason of
normal -> {false, fun record_confirms/2};
- _ -> {true, fun send_nacks/2}
+ _ -> {true, fun send_nacks/2}
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
@@ -1348,20 +1360,25 @@ lock_message(false, _MsgStruct, State) ->
send_nacks([], State) ->
-send_nacks(MXs, State) ->
+send_nacks(MXs, State = #ch{tx_status = none}) ->
MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State);
+send_nacks(_, State) ->
+ maybe_complete_tx(State#ch{tx_status = failed}).
-send_confirms(State = #ch{confirmed = C}) ->
+send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []}).
+ send_confirms(MsgSeqNos, State #ch{confirmed = []});
+send_confirms(State) ->
+ maybe_complete_tx(State).
send_confirms([], State) ->
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1391,6 +1408,25 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
+maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ State;
+maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
+ case gb_trees:is_empty(UMQ) of
+ false -> State;
+ true -> complete_tx(State#ch{confirmed = []})
+ end.
+complete_tx(State = #ch{tx_status = committing}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ State#ch{tx_status = in_progress};
+complete_tx(State = #ch{tx_status = failed}) ->
+ {noreply, State1} = send_exception(
+ rabbit_misc:amqp_error(
+ precondition_failed, "partial tx completion", [],
+ 'tx.commit'),
+ State),
+ State1#ch{tx_status = in_progress}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
@@ -1398,7 +1434,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_enabled = TE}) -> TE;
+i(transactional, #ch{tx_status = TE}) -> TE =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->