summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-25 15:22:23 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-25 15:22:23 +0100
commitc26fa8029b6d8501cb14345497f196de7988539b (patch)
tree25f393609a373e6ef51d005e3815ab1e137913d9
parentb8d6d933cdcbf30cce9703139c2dbd3d6137f94c (diff)
downloadrabbitmq-server-c26fa8029b6d8501cb14345497f196de7988539b.tar.gz
refactor
-rw-r--r--src/rabbit_channel.erl21
1 files changed, 10 insertions, 11 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5c3e5cc9..ae747332 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -806,9 +806,11 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
{_, _} -> LimiterToken
end,
LimiterToken3 = case rabbit_limiter:limit(LimiterToken1, PrefetchCount) of
- ok -> LimiterToken1;
- {disabled, LimiterToken2} -> unlimit_queues(State),
- LimiterToken2
+ ok ->
+ LimiterToken1;
+ {disabled, LimiterToken2} ->
+ ok = limit_queues(LimiterToken2, State),
+ LimiterToken2
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_token = LimiterToken3}};
@@ -1077,9 +1079,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_token = LimiterToken}) ->
LimiterToken2 = case rabbit_limiter:unblock(LimiterToken) of
- ok -> LimiterToken;
- {disabled, LimiterToken1} -> unlimit_queues(State),
- LimiterToken1
+ ok ->
+ LimiterToken;
+ {disabled, LimiterToken1} ->
+ ok = limit_queues(LimiterToken1, State),
+ LimiterToken1
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_token = LimiterToken2}};
@@ -1285,11 +1289,6 @@ enable_limiter(State = #ch{unacked_message_q = UAMQ,
ok = limit_queues(LimiterToken1, State),
LimiterToken1.
-unlimit_queues(State = #ch{limiter_token = LimiterToken}) ->
- LimiterToken1 = rabbit_limiter:disable(LimiterToken),
- ok = limit_queues(LimiterToken1, State),
- LimiterToken1.
-
limit_queues(Token, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Token).