summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-06 16:54:08 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-06 16:54:08 +0000
commitc82c5d3168489bd7c9f72eece48578a7c9d3a270 (patch)
tree0c0a8891480674960e92084c013d0a24fc6e369b /src/rabbit_queue_consumers.erl
parentdc767c96d9150a143d15bf0dbf5cd836596ccde5 (diff)
downloadrabbitmq-server-c82c5d3168489bd7c9f72eece48578a7c9d3a270.tar.gz
refactor: prepare for more state
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl81
1 files changed, 45 insertions, 36 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index d5d2a150..40daec32 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -26,6 +26,8 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
+-record(state, {consumers}).
+
-record(consumer, {tag, ack_required, args}).
%% These are held in our process dictionary
@@ -45,37 +47,37 @@
-ifdef(use_specs).
--type consumers() :: priority_queue:q().
+-type state() :: #state{consumers ::priority_queue:q()}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
-type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
--spec new() -> consumers().
--spec max_active_priority(consumers()) -> integer() | 'infinity' | 'empty'.
--spec inactive(consumers()) -> boolean().
--spec all(consumers()) -> [{ch(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}].
+-spec new() -> state().
+-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
+-spec inactive(state()) -> boolean().
+-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
credit_args(), rabbit_framing:amqp_table(), boolean(),
- consumers()) -> consumers().
--spec remove(ch(), rabbit_types:ctag(), consumers()) ->
- 'not_found' | consumers().
--spec erase_ch(ch(), consumers()) ->
+ state()) -> state().
+-spec remove(ch(), rabbit_types:ctag(), state()) ->
+ 'not_found' | state().
+-spec erase_ch(ch(), state()) ->
'not_found' | {[ack()], [rabbit_types:ctag()],
- consumers()}.
+ state()}.
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
- boolean(), rabbit_amqqueue:name(), T, consumers()) ->
- {boolean(), [{ch(), rabbit_types:ctag()}], T, consumers()}.
+ boolean(), rabbit_amqqueue:name(), T, state()) ->
+ {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
--spec possibly_unblock(cr_fun(), ch(), consumers()) ->
+-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' |
- {'unblocked', [rabbit_types:ctag()], consumers()}.
+ {'unblocked', [rabbit_types:ctag()], state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -86,13 +88,15 @@
%%----------------------------------------------------------------------------
-new() -> priority_queue:new().
+new() -> #state{consumers = priority_queue:new()}.
-max_active_priority(Consumers) -> priority_queue:highest(Consumers).
+max_active_priority(#state{consumers = Consumers}) ->
+ priority_queue:highest(Consumers).
-inactive(Consumers) -> priority_queue:is_empty(Consumers).
+inactive(#state{consumers = Consumers}) ->
+ priority_queue:is_empty(Consumers).
-all(Consumers) ->
+all(#state{consumers = Consumers}) ->
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
consumers(Consumers, []), all_ch_record()).
@@ -109,7 +113,7 @@ unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
- Drained, Consumers) ->
+ Drained, State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -130,9 +134,9 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
args = OtherArgs},
- add_consumer({ChPid, Consumer}, Consumers).
+ State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, Consumers) ->
+remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
@@ -148,10 +152,11 @@ remove(ChPid, ConsumerTag, Consumers) ->
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
- remove_consumer(ChPid, ConsumerTag, Consumers)
+ State#state{consumers =
+ remove_consumer(ChPid, ConsumerTag, Consumers)}
end.
-erase_ch(ChPid, Consumers) ->
+erase_ch(ChPid, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
@@ -162,26 +167,28 @@ erase_ch(ChPid, Consumers) ->
ok = erase_ch_record(C),
{queue:to_list(ChAckTags),
tags(priority_queue:to_list(AllConsumers)),
- remove_consumers(ChPid, Consumers)}
+ State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, Stop, QName, S, Consumers) ->
- deliver(FetchFun, Stop, QName, [], S, Consumers).
+deliver(FetchFun, Stop, QName, S, State) ->
+ deliver(FetchFun, Stop, QName, [], S, State).
-deliver(_FetchFun, true, _QName, Blocked, S, Consumers) ->
- {true, Blocked, S, Consumers};
-deliver( FetchFun, false, QName, Blocked, S, Consumers) ->
+deliver(_FetchFun, true, _QName, Blocked, S, State) ->
+ {true, Blocked, S, State};
+deliver( FetchFun, false, QName, Blocked, S, State = #state{
+ consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, Consumers};
+ {false, Blocked, S, State};
{{value, QEntry, Priority}, Tail} ->
{Stop, Blocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1, Consumers1)
+ deliver(FetchFun, Stop, QName, Blocked1, S1,
+ State#state{consumers = Consumers1})
end.
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
@@ -250,18 +257,19 @@ subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
{{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
end.
-possibly_unblock(Update, ChPid, Consumers) ->
+possibly_unblock(Update, ChPid, State) ->
case lookup_ch(ChPid) of
not_found -> unchanged;
C -> C1 = Update(C),
case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
false -> update_ch_record(C1),
unchanged;
- true -> unblock(C1, Consumers)
+ true -> unblock(C1, State)
end
end.
-unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) ->
+unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
+ State = #state{consumers = Consumers}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -275,7 +283,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) ->
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
tags(Unblocked),
- priority_queue:join(Consumers, UnblockedQ)}
+ State#state{consumers =
+ priority_queue:join(Consumers, UnblockedQ)}}
end.
resume_fun() ->