summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-01 11:26:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-01 11:26:55 +0100
commit27d31e27a3831ac72b4af816b42bb2e67886c791 (patch)
treed546d0d39c20fc411ec4efd78d7e3095e68d052b
parent96411977907f1f4a5b7230aaa5a20a2b93c4fd0b (diff)
downloadrabbitmq-server-27d31e27a3831ac72b4af816b42bb2e67886c791.tar.gz
collect acks in lists rather than queues
...which is slightly simpler and more efficient. This also allows us to optimise for the common case in fold_per_queue.
-rw-r--r--src/rabbit_channel.erl46
1 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3d264330..660abb31 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -663,14 +663,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ,
- tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
none -> ack(Acked, State1);
- in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
+ in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
+ queue:from_list(
+ lists:reverse(Acked))),
State1#ch{uncommitted_ack_q = NewTAQ}
end};
@@ -833,6 +834,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
OkFun = fun () -> ok end,
+ UAMQL = queue:to_list(UAMQ),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_misc:with_exit_handler(
@@ -840,8 +842,8 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_amqqueue:requeue(
QPid, MsgIds, self())
end)
- end, ok, UAMQ),
- ok = notify_limiter(Limiter, UAMQ),
+ end, ok, UAMQL),
+ ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1066,8 +1068,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_ack_q = TAQ}) ->
- State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
+ State1 = new_tx(ack(queue:to_list(TAQ),
+ rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1276,18 +1279,18 @@ ack_record(DeliveryTag, ConsumerTag,
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
collect_acks(Q, 0, true) ->
- {Q, queue:new()};
+ {queue:to_list(Q), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
Multiple ->
- collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc,
+ collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
@@ -1316,12 +1319,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
State#ch{state = closing}}.
-fold_per_queue(F, Acc0, UAQ) ->
- T = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAQ),
- rabbit_misc:gb_trees_fold(F, Acc0, T).
+fold_per_queue(_F, Acc, []) ->
+ Acc;
+fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId], Acc);
+fold_per_queue(F, Acc, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAL),
+ rabbit_misc:gb_trees_fold(F, Acc, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1343,9 +1349,9 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
- true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
+ true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end