diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-03 14:08:07 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-03 14:08:07 +0000 |
commit | 51ecb93879957bac4c45a1100bc0c0eb76b5d39f (patch) | |
tree | 29f5b245d33293fb0cb8ffc54629ba0993080d3e | |
parent | 581c3e8f8b7c367601c3e92f1be3a86340a44f94 (diff) | |
parent | 6da6dbf02b3791fa5340444d4589bf683487bbeb (diff) | |
download | rabbitmq-server-51ecb93879957bac4c45a1100bc0c0eb76b5d39f.tar.gz |
Merge bug25370
-rw-r--r-- | src/rabbit_channel.erl | 135 |
1 files changed, 64 insertions, 71 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5d143fff..6af0325c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,14 +33,15 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - conn_name, limiter, tx_status, next_tag, unacked_message_q, - uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + conn_name, limiter, tx, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, queue_names, queue_monitors, consumer_mapping, blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). +-record(tx, {msgs, acks, nacks}). + -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, @@ -186,12 +187,9 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, conn_pid = ConnPid, conn_name = ConnName, limiter = Limiter, - tx_status = none, + tx = none, next_tag = 1, unacked_message_q = queue:new(), - uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -585,8 +583,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> %% while waiting for the reply to a synchronous command, we generally %% do allow this...except in the case of a pending tx.commit, where %% it could wreak havoc. -handle_method(_Method, _, #ch{tx_status = TxStatus}) - when TxStatus =/= none andalso TxStatus =/= in_progress -> +handle_method(_Method, _, #ch{tx = Tx}) + when Tx =:= committing orelse Tx =:= failed -> rabbit_misc:protocol_error( channel_error, "unexpected command while processing 'tx.commit'", []); @@ -600,7 +598,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, - tx_status = TxStatus, + tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -614,7 +612,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Props, State), check_expiration_header(Props), {MsgSeqNo, State1} = - case {TxStatus, ConfirmEnabled} of + case {Tx, ConfirmEnabled} of {none, false} -> {undefined, State}; {_, _} -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} @@ -624,12 +622,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_trace:tap_in(Message, TraceState), Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), + DQ = {Delivery, QNames}, {noreply, - case TxStatus of - none -> deliver_to_queues({Delivery, QNames}, State1); - in_progress -> TMQ = State1#ch.uncommitted_message_q, - NewTMQ = queue:in({Delivery, QNames}, TMQ), - State1#ch{uncommitted_message_q = NewTMQ} + case Tx of + none -> deliver_to_queues(DQ, State1); + #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs), + State1#ch{tx = Tx#tx{msgs = Msgs1}} end}; {error, Reason} -> precondition_failed("invalid message: ~p", [Reason]) @@ -643,15 +641,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, - case TxStatus of - none -> ack(Acked, State1), - State1; - in_progress -> State1#ch{uncommitted_acks = - Acked ++ State1#ch.uncommitted_acks} + case Tx of + none -> ack(Acked, State1), + State1; + #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -1029,34 +1026,37 @@ handle_method(#'queue.purge'{queue = QueueNameBin, handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> precondition_failed("cannot switch from confirm to tx mode"); +handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> + {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; + handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; + {reply, #'tx.select_ok'{}, State}; -handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); -handle_method(#'tx.commit'{}, _, - State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL, - limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), - ack(TAL, State1), +handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, + acks = Acks, + nacks = Nacks}, + limiter = Limiter}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + ack(Acks, State1), lists:foreach( - fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), - {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks), + {noreply, maybe_complete_tx(State1#ch{tx = committing})}; -handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL}) -> - TNL1 = lists:append([L || {_, L} <- TNL]), - UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; - -handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> + tx = #tx{acks = Acks, + nacks = Nacks}}) -> + NacksL = lists:append([L || {_, L} <- Nacks]), + UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))), + {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, + tx = new_tx()}}; + +handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) -> precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> @@ -1204,17 +1204,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, Content). reject(DeliveryTag, Requeue, Multiple, - State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, - case TxStatus of - none -> - reject(Requeue, Acked, State1#ch.limiter), - State1; - in_progress -> - State1#ch{uncommitted_nacks = - [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + case Tx of + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks], + State1#ch{tx = Tx#tx{nacks = Nacks1}} end}. reject(Requeue, Acked, Limiter) -> @@ -1282,9 +1280,7 @@ ack(Acked, State = #ch{queue_names = QNames}) -> ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). -new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = []}. +new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1382,18 +1378,18 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> send_nacks([], State) -> State; -send_nacks(MXs, State = #ch{tx_status = none}) -> +send_nacks(MXs, State = #ch{tx = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); send_nacks(_, State) -> - maybe_complete_tx(State#ch{tx_status = failed}). + maybe_complete_tx(State#ch{tx = failed}). -send_confirms(State = #ch{tx_status = none, confirmed = []}) -> +send_confirms(State = #ch{tx = none, confirmed = []}) -> State; -send_confirms(State = #ch{tx_status = none, confirmed = C}) -> +send_confirms(State = #ch{tx = none, confirmed = C}) -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> @@ -1433,7 +1429,7 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -maybe_complete_tx(State = #ch{tx_status = in_progress}) -> +maybe_complete_tx(State = #ch{tx = #tx{}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> case dtree:is_empty(UC) of @@ -1441,16 +1437,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> true -> complete_tx(State#ch{confirmed = []}) end. -complete_tx(State = #ch{tx_status = committing}) -> +complete_tx(State = #ch{tx = committing}) -> ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), - State#ch{tx_status = in_progress}; -complete_tx(State = #ch{tx_status = failed}) -> + State#ch{tx = new_tx()}; +complete_tx(State = #ch{tx = failed}) -> {noreply, State1} = handle_exception( rabbit_misc:amqp_error( precondition_failed, "partial tx completion", [], 'tx.commit'), State), - State1#ch{tx_status = in_progress}. + State1#ch{tx = new_tx()}. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1459,19 +1455,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{tx_status = TE}) -> TE =/= none; +i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); -i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> - dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> - queue:len(UAMQ); -i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> - queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> - length(TAL); +i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs); +i(messages_uncommitted, #ch{}) -> 0; +i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks); +i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> |