diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-06 17:36:22 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-06 17:36:22 +0100 |
commit | 8f8faabe84201fb68c4644b37ac7de498f3f0d39 (patch) | |
tree | 0b25c22845f8e72dcba0cd9f27a976f9fecf8f42 | |
parent | a6f2674e3fb810bfd8a52d388dde5fa95cdfbb20 (diff) | |
download | rabbitmq-server-8f8faabe84201fb68c4644b37ac7de498f3f0d39.tar.gz |
make 'tx.commit-ok' indicate responsibility transfer
This is now very close to the previous tx semantics.
The downsides w.r.t. the previous state of this branch are
- a bit more code, though only in the channel
- tx and confirm mode no longer compose
- tx always carries the cost of confirms
Implementation notes:
- The channel must remain active while a commit waits for confirms
from queues to trickle in. We achieve this by recording the fact
that we have a pending commit.
- The trigger for sending the commit-ok is that the number of pending
confirms drops to zero and we have a pending commit.
- We check for that condition in three places:
a) at the point of commit (in case the tx contains no publishes or
all confirmations happen as part of delivering the messages to
queues)
b) where we would normally send basic.acks
c) where we would normally send basic.nacks
- we are re-using the same logic/state as for 'proper' confirms,
except we suppress the sending of acks/nacks
- handling the failure case is slightly awkward. We record a tx as
'failed' as soon as we encounter the 'nack' case. Subsequently, when
the aforementioned triggering condition is met instead of sending a
tx.commit-ok we send a precondition_failed error. But we can't just
employ rabbit_misc:protocol_error here since that only works in the
context of handling an AMQP method, which is only the case for
(a). So instead we drop to a slightly lower API level and rely on
the fact precondition_failed is a channel-level, rather than
connection-level error.
-rw-r--r-- | src/rabbit_channel.erl | 116 |
1 files changed, 76 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 81a0ee80..0c211b46 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,7 +30,7 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, tx_enabled, next_tag, + limiter_pid, start_limiter_fun, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -174,7 +174,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, - tx_enabled = false, + tx_status = none, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), @@ -516,10 +516,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -blind_confirm({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = MsgSeqNo}, _QNames}, State) -> - record_confirm(MsgSeqNo, XName, State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -598,6 +594,15 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> ReaderPid ! {channel_closing, self()}, {noreply, State1}; +%% Even though the spec prohibits the client from sending commands +%% while waiting for the reply to a synchronous command, we generally +%% do allow this...except in the case of a pending tx.commit, where +%% it could wreak havoc. +handle_method(_Method, _, #ch{tx_status = TxStatus}) + when TxStatus =/= none andalso TxStatus =/= in_progress -> + rabbit_misc:protocol_error( + channel_error, "unexpected command while processing 'tx.commit'", []); + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -606,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, - tx_enabled = TxEnabled, + tx_status = TxStatus, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -618,10 +623,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), {MsgSeqNo, State1} = - case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case {TxStatus, ConfirmEnabled} of + {none, false} -> {undefined, State}; + {_, _} -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> @@ -629,13 +634,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - State2 = case TxEnabled of - true -> TMQ = State1#ch.uncommitted_message_q, - NewTMQ = queue:in({Delivery, QNames}, TMQ), - State1#ch{uncommitted_message_q = NewTMQ}; - false -> deliver_to_queues({Delivery, QNames}, State1) - end, - {noreply, State2}; + {noreply, + case TxStatus of + none -> deliver_to_queues({Delivery, QNames}, State1); + in_progress -> TMQ = State1#ch.uncommitted_message_q, + NewTMQ = queue:in({Delivery, QNames}, TMQ), + State1#ch{uncommitted_message_q = NewTMQ} + end}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, "invalid message: ~p", [Reason]) @@ -650,15 +655,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{unacked_message_q = UAMQ, - tx_enabled = TxEnabled}) -> + tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, case TxEnabled of - true -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, - Acked), - State1#ch{uncommitted_ack_q = NewTAQ}; - false -> ack(Acked, State1) - end}; + {noreply, + case TxStatus of + none -> ack(Acked, State1); + in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), + State1#ch{uncommitted_ack_q = NewTAQ} + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -1039,28 +1044,35 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State#ch{tx_enabled = true}}; + {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; -handle_method(#'tx.commit'{}, _, #ch{tx_enabled = false}) -> +handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_ack_q = TAQ}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), - {reply, #'tx.commit_ok'{}, new_tx(ack(TAQ, State1))}; + State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + State, TMQ))), + {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; -handle_method(#'tx.rollback'{}, _, #ch{tx_enabled = false}) -> +handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_message_q = TMQ, uncommitted_ack_q = TAQ}) -> - State1 = rabbit_misc:queue_fold(fun blind_confirm/2, State, TMQ), - {reply, #'tx.rollback_ok'{}, new_tx(State1#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; + {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = + queue:join(TAQ, UAMQ)})}; + +handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1126,7 +1138,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = case Reason of normal -> {false, fun record_confirms/2}; - _ -> {true, fun send_nacks/2} + _ -> {true, fun send_nacks/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), erase_queue_stats(QPid), @@ -1348,20 +1360,25 @@ lock_message(false, _MsgStruct, State) -> send_nacks([], State) -> State; -send_nacks(MXs, State) -> +send_nacks(MXs, State = #ch{tx_status = none}) -> MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], coalesce_and_send(MsgSeqNos, fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} - end, State). + end, State); +send_nacks(_, State) -> + maybe_complete_tx(State#ch{tx_status = failed}). -send_confirms(State = #ch{confirmed = C}) -> +send_confirms(State = #ch{tx_status = none, confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), MsgSeqNo end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}). + send_confirms(MsgSeqNos, State #ch{confirmed = []}); +send_confirms(State) -> + maybe_complete_tx(State). + send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1391,6 +1408,25 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. +maybe_complete_tx(State = #ch{tx_status = in_progress}) -> + State; +maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> + case gb_trees:is_empty(UMQ) of + false -> State; + true -> complete_tx(State#ch{confirmed = []}) + end. + +complete_tx(State = #ch{tx_status = committing}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + State#ch{tx_status = in_progress}; +complete_tx(State = #ch{tx_status = failed}) -> + {noreply, State1} = send_exception( + rabbit_misc:amqp_error( + precondition_failed, "partial tx completion", [], + 'tx.commit'), + State), + State1#ch{tx_status = in_progress}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); @@ -1398,7 +1434,7 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{tx_enabled = TE}) -> TE; +i(transactional, #ch{tx_status = TE}) -> TE =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); |