summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-26 14:19:36 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-26 14:19:36 +0100
commit34cb601fc4d0d2d46c661e1e3d746da501a586fc (patch)
tree03aaacae81b860749c90dba5a8d7342aee523e7a
parent157de0376acb3aa95184fb9c50d3fb68fdf82fff (diff)
downloadrabbitmq-server-34cb601fc4d0d2d46c661e1e3d746da501a586fc.tar.gz
unbreak amqqueue_process/limiter logic
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl14
2 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f44f5fec..493e6d24 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -449,10 +449,10 @@ limit_all(QPids, ChPid, Limiter) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index aa2fb0f4..b7401373 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1072,15 +1072,17 @@ handle_cast({limit, ChPid, Limiter}, State) ->
State, ChPid,
fun (C = #cr{consumer_count = ConsumerCount,
limiter = OldLimiter,
- is_limit_active = Limited}) ->
- if ConsumerCount =/= 0 ->
+ is_limit_active = OldLimited}) ->
+ case {ConsumerCount =/= 0,
+ not rabbit_limiter:is_enabled(OldLimiter)} of
+ {true, true} ->
ok = rabbit_limiter:register(Limiter, self());
- true ->
+ {_, _} ->
ok
end,
- NewLimited = Limited,
- C#cr{limiter = Limiter,
- is_limit_active = NewLimited}
+ Limited =
+ OldLimited andalso rabbit_limiter:is_enabled(Limiter),
+ C#cr{limiter = Limiter, is_limit_active = Limited}
end));
handle_cast({flush, ChPid}, State) ->