diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-07 11:53:54 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-07 11:53:54 +0000 |
commit | cebfaf63a18858987bd19b0d2dbfa11642392c69 (patch) | |
tree | 5d474874917121bfdd5540a764e72e3acbd9e1d6 | |
parent | 0860b908377d89725fc366095e6c273eae2d691a (diff) | |
download | rabbitmq-server-cebfaf63a18858987bd19b0d2dbfa11642392c69.tar.gz |
get rid of #tx{}
and fix uncommitted_acks info item
-rw-r--r-- | src/rabbit_channel.erl | 77 |
1 files changed, 38 insertions, 39 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cccd09dd..df056a6e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -40,17 +40,6 @@ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). - --record(tx, {msgs, acks}). %% (1) -%% (1) acks looks s.t. like this: -%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...] -%% -%% Each element is a pair consisting of a tag and a list of -%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' -%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as -%% well as the list overall, are in "most-recent (generally youngest) -%% ack first" order. - -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, @@ -631,12 +620,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), DQ = {Delivery, QNames}, - {noreply, - case Tx of - none -> deliver_to_queues(DQ, State1); - #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs), - State1#ch{tx = Tx#tx{msgs = Msgs1}} - end}; + {noreply, case Tx of + none -> deliver_to_queues(DQ, State1); + {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + State1#ch{tx = {Msgs1, Acks}} + end}; {error, Reason} -> precondition_failed("invalid message: ~p", [Reason]) end; @@ -652,13 +640,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, - case Tx of - none -> ack(Acked, State1), - State1; - #tx{acks = Acks} -> Acks1 = ack_cons(ack, Acked, Acks), - State1#ch{tx = Tx#tx{acks = Acks1}} - end}; + {noreply, case Tx of + none -> ack(Acked, State1), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -1042,8 +1029,7 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); -handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, - acks = Acks}, +handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), lists:foreach( @@ -1056,13 +1042,13 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - tx = #tx{acks = Acks}}) -> + tx = {_Msgs, Acks}}) -> AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; -handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) -> +handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) -> precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> @@ -1213,13 +1199,12 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, - {noreply, - case Tx of - none -> reject(Requeue, Acked, State1#ch.limiter), - State1; - #tx{acks = Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks), - State1#ch{tx = Tx#tx{acks = Acks1}} - end}. + {noreply, case Tx of + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}. %% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> @@ -1294,7 +1279,19 @@ ack(Acked, State = #ch{queue_names = QNames}) -> end, Acked), ok = notify_limiter(State#ch.limiter, Acked). -new_tx() -> #tx{msgs = queue:new(), acks = []}. +%% {Msgs, Acks} +%% +%% Msgs is a queue. +%% +%% Acks looks s.t. like this: +%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...] +%% +%% Each element is a pair consisting of a tag and a list of +%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' +%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as +%% well as the list overall, are in "most-recent (generally youngest) +%% ack first" order. +new_tx() -> {queue:new(), []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1457,7 +1454,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks]. -maybe_complete_tx(State = #ch{tx = #tx{}}) -> +ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]). + +maybe_complete_tx(State = #ch{tx = {_, _}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> case dtree:is_empty(UC) of @@ -1489,9 +1488,9 @@ i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); -i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; -i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks); +i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); |