summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-06 19:18:33 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-06 19:18:33 +0000
commit935b4eb6b4f74507434f818dbcdde975125a8e7e (patch)
tree1755275deb5e5d98d1c623dba6d56b93c56155a9 /src/rabbit_queue_consumers.erl
parentc82c5d3168489bd7c9f72eece48578a7c9d3a270 (diff)
downloadrabbitmq-server-935b4eb6b4f74507434f818dbcdde975125a8e7e.tar.gz
move consumer_use calculations into rabbit_queue_consumers
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl55
1 files changed, 45 insertions, 10 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 40daec32..ea0ab6da 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -20,13 +20,14 @@
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
+ utilisation/1]).
%%----------------------------------------------------------------------------
-define(UNSENT_MESSAGE_LIMIT, 200).
--record(state, {consumers}).
+-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -47,7 +48,12 @@
-ifdef(use_specs).
--type state() :: #state{consumers ::priority_queue:q()}.
+-type time_micros() :: non_neg_integer().
+-type ratio() :: float().
+-type state() :: #state{consumers ::priority_queue:q(),
+ use :: {'inactive',
+ time_micros(), time_micros(), ratio()} |
+ {'active', time_micros(), ratio()}}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@@ -83,12 +89,14 @@
-spec activate_limit_fun() -> cr_fun().
-spec credit_fun(boolean(), non_neg_integer(), boolean(),
rabbit_types:ctag()) -> cr_fun().
+-spec utilisation(state()) -> ratio().
-endif.
%%----------------------------------------------------------------------------
-new() -> #state{consumers = priority_queue:new()}.
+new() -> #state{consumers = priority_queue:new(),
+ use = {inactive, now_micros(), 0, 0.0}}.
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
@@ -178,11 +186,11 @@ deliver(FetchFun, Stop, QName, S, State) ->
deliver(_FetchFun, true, _QName, Blocked, S, State) ->
{true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S, State = #state{
- consumers = Consumers}) ->
+deliver( FetchFun, false, QName, Blocked, S,
+ State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State};
+ {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, Blocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
@@ -269,7 +277,7 @@ possibly_unblock(Update, ChPid, State) ->
end.
unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
- State = #state{consumers = Consumers}) ->
+ State = #state{consumers = Consumers, use = Use}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -283,8 +291,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
tags(Unblocked),
- State#state{consumers =
- priority_queue:join(Consumers, UnblockedQ)}}
+ State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
+ use = update_use(Use, active)}}
end.
resume_fun() ->
@@ -312,6 +320,11 @@ credit_fun(IsEmpty, Credit, Drain, CTag) ->
end
end.
+utilisation(#state{use = {active, Since, Avg}}) ->
+ use_avg(now_micros() - Since, 0, Avg);
+utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
+ use_avg(Active, now_micros() - Since, Avg).
+
%%----------------------------------------------------------------------------
lookup_ch(ChPid) ->
@@ -389,3 +402,25 @@ remove_consumers(ChPid, Queue) ->
priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
(_) -> true
end, Queue).
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
+now_micros() -> timer:now_diff(now(), {0,0,0}).