diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-06 04:48:18 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-06 04:48:18 +0000 |
commit | 2683e219af2c91432a0b5453d978b175c02fea5c (patch) | |
tree | 4eba12e38af9c2fd1f9b67131df7cf4a82fe2ee5 | |
parent | 0b7c1a6c6ee008f836efb7d40a48a1df9d850fac (diff) | |
download | rabbitmq-server-2683e219af2c91432a0b5453d978b175c02fea5c.tar.gz |
optimise rabbit_channel:ack/2
- moving the ?INCR_STATS call inside the fold_per_queue fun reduces
consing when stats are disabled
- since this was the only call to fold_per_queue where we cared about
the result, and we no longer do, we can switch the 'fold' to
'foreach'
-rw-r--r-- | src/rabbit_channel.erl | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index aaa463f1..68625dbf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -800,14 +800,12 @@ handle_method(#'basic.recover_async'{requeue = true}, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_misc:with_exit_handler( - OkFun, fun () -> - rabbit_amqqueue:requeue( - QPid, MsgIds, self()) - end) - end, ok, UAMQL), + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_misc:with_exit_handler( + OkFun, + fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) + end, UAMQL), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method @@ -1215,10 +1213,10 @@ reject(DeliveryTag, Requeue, Multiple, end}. reject(Requeue, Acked, Limiter) -> - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, Acked), ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, @@ -1267,17 +1265,16 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> end. ack(Acked, State = #ch{queue_names = QNames}) -> - Incs = fold_per_queue( - fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - case dict:find(QPid, QNames) of - {ok, QName} -> Count = length(MsgIds), - [{queue_stats, QName, Count} | L]; - error -> L - end - end, [], Acked), - ok = notify_limiter(State#ch.limiter, Acked), - ?INCR_STATS(Incs, ack, State). + foreach_per_queue( + fun (QPid, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + ?INCR_STATS(case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count}]; + error -> [] + end, ack, State) + end, Acked), + ok = notify_limiter(State#ch.limiter, Acked). new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}. @@ -1289,15 +1286,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers, sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. -fold_per_queue(_F, Acc, []) -> - Acc; -fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case - F(QPid, [MsgId], Acc); -fold_per_queue(F, Acc, UAL) -> +foreach_per_queue(_F, []) -> + ok; +foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId]); +foreach_per_queue(F, UAL) -> T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons(QPid, MsgId, T) end, gb_trees:empty(), UAL), - rabbit_misc:gb_trees_fold(F, Acc, T). + rabbit_misc:gb_trees_foreach(F, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> |