diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-24 00:39:35 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-24 00:39:35 +0000 |
commit | 50ac22a8921a3952b157323c95cee491934262eb (patch) | |
tree | 1b11aee7bda9cec058be83b8910d905769caa292 | |
parent | 350fec50820e21c21cc21304254b0533da279b96 (diff) | |
download | rabbitmq-server-50ac22a8921a3952b157323c95cee491934262eb.tar.gz |
refactor: introduce helper to simplify update_consumer_use call sites
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8c9da001..048c4a6d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -486,12 +486,10 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers, - consumer_use = CUInfo}) -> + State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, - State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; + {false, update_consumer_use(State, inactive)}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -542,14 +540,17 @@ deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. -update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> +update_consumer_use(State = #q{consumer_use = CUInfo}, Use) -> + State#q{consumer_use = update_consumer_use1(CUInfo, Use)}. + +update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; -update_consumer_use({active, _, _} = CUInfo, active) -> +update_consumer_use1({active, _, _} = CUInfo, active) -> CUInfo; -update_consumer_use({active, Since, Avg}, inactive) -> +update_consumer_use1({active, Since, Avg}, inactive) -> Now = now_micros(), {inactive, Now, Now - Since, Avg}; -update_consumer_use({inactive, Since, Active, Avg}, active) -> +update_consumer_use1({inactive, Since, Active, Avg}, active) -> Now = now_micros(), {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. @@ -739,7 +740,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> +unblock(State, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -751,14 +752,13 @@ unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - State1 = State#q{consumer_use = - update_consumer_use(CUInfo, active)}, - AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), - State2 = State1#q{active_consumers = AC1}, + AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), + State1 = update_consumer_use(State#q{active_consumers = AC1}, + active), [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State2) || + consumer_unblocked, [{consumer_tag, CTag}], State1) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State2) + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; |