diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-17 12:57:33 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-17 12:57:33 +0100 |
commit | da22e75f58ec28d6597bc721e7eedba5ecdcc317 (patch) | |
tree | 922a22b1ed7e2ff9c945e97708ac034906fa2419 | |
parent | 6ee9bd43c43cfa09aa605801530aada1de4cdbe3 (diff) | |
download | rabbitmq-server-da22e75f58ec28d6597bc721e7eedba5ecdcc317.tar.gz |
Cosmeticsbug24285
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 37 |
3 files changed, 19 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 07079375..88ff26cc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -443,9 +443,7 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:invoke_no_result( - QPids, fun (QPid) -> - gen_server2:cast(QPid, {limit, ChPid, Limiter}) - end). + QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ac40bd44..11a95a62 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1107,13 +1107,10 @@ handle_cast({limit, ChPid, Limiter}, State) -> is_limit_active = OldLimited}) -> case (ConsumerCount =/= 0 andalso not rabbit_limiter:is_enabled(OldLimiter)) of - true -> - ok = rabbit_limiter:register(Limiter, self()); - false -> - ok + true -> ok = rabbit_limiter:register(Limiter, self()); + false -> ok end, - Limited = - OldLimited andalso rabbit_limiter:is_enabled(Limiter), + Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), C#cr{limiter = Limiter, is_limit_active = Limited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 151a9801..dfe84644 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -814,18 +814,15 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> - Limiter1 = - case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of - {false, 0} -> Limiter; - {false, _} -> enable_limiter(State); - {_, _} -> Limiter - end, + Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of + {false, 0} -> Limiter; + {false, _} -> enable_limiter(State); + {_, _} -> Limiter + end, Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of - ok -> - Limiter1; - {disabled, Limiter2} -> - ok = limit_queues(Limiter2, State), - Limiter2 + ok -> Limiter1; + {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), + Limiter2 end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; @@ -1094,14 +1091,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter = Limiter}) -> Limiter2 = case rabbit_limiter:unblock(Limiter) of - ok -> - Limiter; - {disabled, Limiter1} -> - ok = limit_queues(Limiter1, State), - Limiter1 + ok -> Limiter; + {disabled, Limiter1} -> ok = limit_queues(Limiter1, State), + Limiter1 end, - {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter = Limiter2}}; + {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}}; handle_method(#'channel.flow'{active = false}, _, State = #ch{consumer_mapping = Consumers, @@ -1323,10 +1317,9 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; - true -> case rabbit_misc:queue_fold( - fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of + true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end |