summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-12-24 00:39:35 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-12-24 00:39:35 +0000
commit50ac22a8921a3952b157323c95cee491934262eb (patch)
tree1b11aee7bda9cec058be83b8910d905769caa292
parent350fec50820e21c21cc21304254b0533da279b96 (diff)
downloadrabbitmq-server-50ac22a8921a3952b157323c95cee491934262eb.tar.gz
refactor: introduce helper to simplify update_consumer_use call sites
-rw-r--r--src/rabbit_amqqueue_process.erl30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8c9da001..048c4a6d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -486,12 +486,10 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers,
- consumer_use = CUInfo}) ->
+ State = #q{active_consumers = ActiveConsumers}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false,
- State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
+ {false, update_consumer_use(State, inactive)};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -542,14 +540,17 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
-update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+update_consumer_use(State = #q{consumer_use = CUInfo}, Use) ->
+ State#q{consumer_use = update_consumer_use1(CUInfo, Use)}.
+
+update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
-update_consumer_use({active, _, _} = CUInfo, active) ->
+update_consumer_use1({active, _, _} = CUInfo, active) ->
CUInfo;
-update_consumer_use({active, Since, Avg}, inactive) ->
+update_consumer_use1({active, Since, Avg}, inactive) ->
Now = now_micros(),
{inactive, Now, Now - Since, Avg};
-update_consumer_use({inactive, Since, Active, Avg}, active) ->
+update_consumer_use1({inactive, Since, Active, Avg}, active) ->
Now = now_micros(),
{active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
@@ -739,7 +740,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
+unblock(State, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -751,14 +752,13 @@ unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- State1 = State#q{consumer_use =
- update_consumer_use(CUInfo, active)},
- AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
- State2 = State1#q{active_consumers = AC1},
+ AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
+ State1 = update_consumer_use(State#q{active_consumers = AC1},
+ active),
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State2) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State2)
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;