summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-12-24 10:39:46 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-12-24 10:39:46 +0000
commit77bdb17b5e75e4b1b1c0ed8aa27b092b1d9d0f31 (patch)
tree3a50a9d0d8858f37029fef4fbcbabeb0cde724d1
parent86428bcf7ed570d07b970e00b99efb89f824df41 (diff)
downloadrabbitmq-server-77bdb17b5e75e4b1b1c0ed8aa27b092b1d9d0f31.tar.gz
refactor: re-order and re-name
-rw-r--r--src/rabbit_queue_consumers.erl129
1 files changed, 64 insertions, 65 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f94bd46c..75093cee 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,12 +16,11 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1,
- unacknowledged_message_count/0, erase_ch/2, deliver/5,
- add/9, remove/3, send_drained/0, record_ack/3, subtract_acks/2,
+-export([new/0, max_active_priority/1, inactive/1, count/0, all/1,
+ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
- count/0, all/1]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]).
%%----------------------------------------------------------------------------
@@ -50,23 +49,22 @@ max_active_priority(Consumers) -> priority_queue:highest(Consumers).
inactive(Consumers) -> priority_queue:is_empty(Consumers).
+count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+all(Consumers) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(Consumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
+ end, Acc, Consumers).
+
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-erase_ch(ChPid, Consumers) ->
- case lookup_ch(ChPid) of
- not_found ->
- not_found;
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- blocked_consumers = BlockedQ} ->
- AllConsumers = priority_queue:join(Consumers, BlockedQ),
- ok = erase_ch_record(C),
- {queue:to_list(ChAckTags),
- tags(priority_queue:to_list(AllConsumers)),
- remove_consumers(ChPid, Consumers)}
- end.
-
add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
Drained, Consumers) ->
C = #cr{consumer_count = Count,
@@ -110,32 +108,21 @@ remove(ChPid, ConsumerTag, Consumers) ->
remove_consumer(ChPid, ConsumerTag, Consumers)
end.
-send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()].
-
-record_ack(ChPid, LimiterPid, AckTag) ->
- C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
- ok.
-
-subtract_acks(ChPid, AckTags) ->
+erase_ch(ChPid, Consumers) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- ok
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = BlockedQ} ->
+ AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ ok = erase_ch_record(C),
+ {queue:to_list(ChAckTags),
+ tags(priority_queue:to_list(AllConsumers)),
+ remove_consumers(ChPid, Consumers)}
end.
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
- case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
- end.
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()].
deliver(DeliverFun, Stop, QName, S, Consumers) ->
deliver(DeliverFun, Stop, QName, [], S, Consumers).
@@ -148,12 +135,13 @@ deliver( DeliverFun, false, QName, Blocked, S, Consumers) ->
{false, Blocked, S, Consumers};
{{value, QEntry, Priority}, Tail} ->
{Stop, Blocked1, S1, Consumers1} =
- deliver1(DeliverFun, QEntry, Priority, QName, Blocked, S, Tail),
+ deliver_to_consumer(DeliverFun, QEntry, Priority, QName,
+ Blocked, S, Tail),
deliver(DeliverFun, Stop, QName, Blocked1, S1, Consumers1)
end.
-deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+deliver_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, QName,
+ Blocked, S, Consumers) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
@@ -167,7 +155,7 @@ deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName,
Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
{false, Blocked1, S, Consumers};
{continue, Limiter} ->
- {Stop, S1} = deliver1(
+ {Stop, S1} = deliver_to_consumer(
DeliverFun, Consumer,
C#cr{limiter = Limiter}, QName, S),
{Stop, Blocked, S1,
@@ -175,13 +163,13 @@ deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName,
end
end.
-deliver1(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- QName, S) ->
+deliver_to_consumer(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ QName, S) ->
{{Message, IsDelivered, AckTag}, Stop, S1} = DeliverFun(AckRequired, S),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
@@ -193,6 +181,31 @@ deliver1(DeliverFun,
unsent_message_count = Count + 1}),
{Stop, S1}.
+record_ack(ChPid, LimiterPid, AckTag) ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
+ update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ ok.
+
+subtract_acks(ChPid, AckTags) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
+ ok
+ end.
+
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
possibly_unblock(Update, ChPid, Consumers) ->
case lookup_ch(ChPid) of
not_found -> unchanged;
@@ -246,20 +259,6 @@ credit_fun(IsEmpty, Credit, Drain, CTag) ->
end
end.
-count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
-
-all(Consumers) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(Consumers, []), all_ch_record()).
-
-consumers(Consumers, Acc) ->
- priority_queue:fold(
- fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
- end, Acc, Consumers).
-
%%----------------------------------------------------------------------------
lookup_ch(ChPid) ->