summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-06 04:48:18 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-06 04:48:18 +0000
commit2683e219af2c91432a0b5453d978b175c02fea5c (patch)
tree4eba12e38af9c2fd1f9b67131df7cf4a82fe2ee5
parent0b7c1a6c6ee008f836efb7d40a48a1df9d850fac (diff)
downloadrabbitmq-server-2683e219af2c91432a0b5453d978b175c02fea5c.tar.gz
optimise rabbit_channel:ack/2
- moving the ?INCR_STATS call inside the fold_per_queue fun reduces consing when stats are disabled - since this was the only call to fold_per_queue where we cared about the result, and we no longer do, we can switch the 'fold' to 'foreach'
-rw-r--r--src/rabbit_channel.erl55
1 files changed, 26 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index aaa463f1..68625dbf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -800,14 +800,12 @@ handle_method(#'basic.recover_async'{requeue = true},
limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_misc:with_exit_handler(
- OkFun, fun () ->
- rabbit_amqqueue:requeue(
- QPid, MsgIds, self())
- end)
- end, ok, UAMQL),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, UAMQL),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1215,10 +1213,10 @@ reject(DeliveryTag, Requeue, Multiple,
end}.
reject(Requeue, Acked, Limiter) ->
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, Acked),
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
@@ -1267,17 +1265,16 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end.
ack(Acked, State = #ch{queue_names = QNames}) ->
- Incs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- case dict:find(QPid, QNames) of
- {ok, QName} -> Count = length(MsgIds),
- [{queue_stats, QName, Count} | L];
- error -> L
- end
- end, [], Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- ?INCR_STATS(Incs, ack, State).
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
@@ -1289,15 +1286,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-fold_per_queue(_F, Acc, []) ->
- Acc;
-fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId], Acc);
-fold_per_queue(F, Acc, UAL) ->
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(F, Acc, T).
+ rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->