diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 11:26:55 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 11:26:55 +0100 |
commit | 27d31e27a3831ac72b4af816b42bb2e67886c791 (patch) | |
tree | d546d0d39c20fc411ec4efd78d7e3095e68d052b | |
parent | 96411977907f1f4a5b7230aaa5a20a2b93c4fd0b (diff) | |
download | rabbitmq-server-27d31e27a3831ac72b4af816b42bb2e67886c791.tar.gz |
collect acks in lists rather than queues
...which is slightly simpler and more efficient.
This also allows us to optimise for the common case in fold_per_queue.
-rw-r--r-- | src/rabbit_channel.erl | 46 |
1 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3d264330..660abb31 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -663,14 +663,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, - tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of none -> ack(Acked, State1); - in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), + in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, + queue:from_list( + lists:reverse(Acked))), State1#ch{uncommitted_ack_q = NewTAQ} end}; @@ -833,6 +834,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, + UAMQL = queue:to_list(UAMQ), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_misc:with_exit_handler( @@ -840,8 +842,8 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_amqqueue:requeue( QPid, MsgIds, self()) end) - end, ok, UAMQ), - ok = notify_limiter(Limiter, UAMQ), + end, ok, UAMQL), + ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1066,8 +1068,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_ack_q = TAQ}) -> - State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), + State1 = new_tx(ack(queue:to_list(TAQ), + rabbit_misc:queue_fold(fun deliver_to_queues/2, + State, TMQ))), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1276,18 +1279,18 @@ ack_record(DeliveryTag, ConsumerTag, {DeliveryTag, ConsumerTag, {QPid, MsgId}}. collect_acks(Q, 0, true) -> - {Q, queue:new()}; + {queue:to_list(Q), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). + collect_acks([], queue:new(), Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; + {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)}; Multiple -> - collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, + collect_acks([UnackedMsg | ToAcc], PrefixAcc, QTail, DeliveryTag, Multiple); true -> collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), @@ -1316,12 +1319,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) -> {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), State#ch{state = closing}}. -fold_per_queue(F, Acc0, UAQ) -> - T = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), UAQ), - rabbit_misc:gb_trees_fold(F, Acc0, T). +fold_per_queue(_F, Acc, []) -> + Acc; +fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId], Acc); +fold_per_queue(F, Acc, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAL), + rabbit_misc:gb_trees_fold(F, Acc, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1343,9 +1349,9 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; - true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of + true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end |