diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 14:19:36 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 14:19:36 +0100 |
commit | 34cb601fc4d0d2d46c661e1e3d746da501a586fc (patch) | |
tree | 03aaacae81b860749c90dba5a8d7342aee523e7a | |
parent | 157de0376acb3aa95184fb9c50d3fb68fdf82fff (diff) | |
download | rabbitmq-server-34cb601fc4d0d2d46c661e1e3d746da501a586fc.tar.gz |
unbreak amqqueue_process/limiter logic
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 |
2 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f44f5fec..493e6d24 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -449,10 +449,10 @@ limit_all(QPids, ChPid, Limiter) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index aa2fb0f4..b7401373 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1072,15 +1072,17 @@ handle_cast({limit, ChPid, Limiter}, State) -> State, ChPid, fun (C = #cr{consumer_count = ConsumerCount, limiter = OldLimiter, - is_limit_active = Limited}) -> - if ConsumerCount =/= 0 -> + is_limit_active = OldLimited}) -> + case {ConsumerCount =/= 0, + not rabbit_limiter:is_enabled(OldLimiter)} of + {true, true} -> ok = rabbit_limiter:register(Limiter, self()); - true -> + {_, _} -> ok end, - NewLimited = Limited, - C#cr{limiter = Limiter, - is_limit_active = NewLimited} + Limited = + OldLimited andalso rabbit_limiter:is_enabled(Limiter), + C#cr{limiter = Limiter, is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> |