summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 00:02:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 00:02:55 +0000
commitaaec8e3c296797cc0c83a2ab2e408111df6f078e (patch)
tree66215b655cd17cc4ac540e71683a0e042fa3ab6a
parente0896c4872fa32d4300e3c615148c7141aa6ac99 (diff)
downloadrabbitmq-server-aaec8e3c296797cc0c83a2ab2e408111df6f078e.tar.gz
inline
-rw-r--r--src/rabbit_channel.erl18
1 files changed, 7 insertions, 11 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2d49b8b2..7f9ff827 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -856,8 +856,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
PrefetchCount, queue:len(UAMQ)),
- {reply, #'basic.qos_ok'{},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
@@ -1412,15 +1417,6 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-maybe_limit_queues(OldLimiter, NewLimiter, State) ->
- case ((not rabbit_limiter:is_active(OldLimiter)) andalso
- rabbit_limiter:is_active(NewLimiter)) of
- true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:activate_limit_all(Queues, self());
- false -> ok
- end,
- State.
-
consumer_queues(Consumers) ->
lists:usort([QPid ||
{_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).