diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-06 19:18:33 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-06 19:18:33 +0000 |
commit | 935b4eb6b4f74507434f818dbcdde975125a8e7e (patch) | |
tree | 1755275deb5e5d98d1c623dba6d56b93c56155a9 /src/rabbit_queue_consumers.erl | |
parent | c82c5d3168489bd7c9f72eece48578a7c9d3a270 (diff) | |
download | rabbitmq-server-935b4eb6b4f74507434f818dbcdde975125a8e7e.tar.gz |
move consumer_use calculations into rabbit_queue_consumers
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 40daec32..ea0ab6da 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -20,13 +20,14 @@ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/2, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, + utilisation/1]). %%---------------------------------------------------------------------------- -define(UNSENT_MESSAGE_LIMIT, 200). --record(state, {consumers}). +-record(state, {consumers, use}). -record(consumer, {tag, ack_required, args}). @@ -47,7 +48,12 @@ -ifdef(use_specs). --type state() :: #state{consumers ::priority_queue:q()}. +-type time_micros() :: non_neg_integer(). +-type ratio() :: float(). +-type state() :: #state{consumers ::priority_queue:q(), + use :: {'inactive', + time_micros(), time_micros(), ratio()} | + {'active', time_micros(), ratio()}}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). @@ -83,12 +89,14 @@ -spec activate_limit_fun() -> cr_fun(). -spec credit_fun(boolean(), non_neg_integer(), boolean(), rabbit_types:ctag()) -> cr_fun(). +-spec utilisation(state()) -> ratio(). -endif. %%---------------------------------------------------------------------------- -new() -> #state{consumers = priority_queue:new()}. +new() -> #state{consumers = priority_queue:new(), + use = {inactive, now_micros(), 0, 0.0}}. max_active_priority(#state{consumers = Consumers}) -> priority_queue:highest(Consumers). @@ -178,11 +186,11 @@ deliver(FetchFun, Stop, QName, S, State) -> deliver(_FetchFun, true, _QName, Blocked, S, State) -> {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, State = #state{ - consumers = Consumers}) -> +deliver( FetchFun, false, QName, Blocked, S, + State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State}; + {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, Blocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, @@ -269,7 +277,7 @@ possibly_unblock(Update, ChPid, State) -> end. unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers, use = Use}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -283,8 +291,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, tags(Unblocked), - State#state{consumers = - priority_queue:join(Consumers, UnblockedQ)}} + State#state{consumers = priority_queue:join(Consumers, UnblockedQ), + use = update_use(Use, active)}} end. resume_fun() -> @@ -312,6 +320,11 @@ credit_fun(IsEmpty, Credit, Drain, CTag) -> end end. +utilisation(#state{use = {active, Since, Avg}}) -> + use_avg(now_micros() - Since, 0, Avg); +utilisation(#state{use = {inactive, Since, Active, Avg}}) -> + use_avg(Active, now_micros() - Since, Avg). + %%---------------------------------------------------------------------------- lookup_ch(ChPid) -> @@ -389,3 +402,25 @@ remove_consumers(ChPid, Queue) -> priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; (_) -> true end, Queue). + +update_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, use_avg(Active, Now - Since, Avg)}. + +use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + +now_micros() -> timer:now_diff(now(), {0,0,0}). |