diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-24 10:39:46 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-24 10:39:46 +0000 |
commit | 77bdb17b5e75e4b1b1c0ed8aa27b092b1d9d0f31 (patch) | |
tree | 3a50a9d0d8858f37029fef4fbcbabeb0cde724d1 | |
parent | 86428bcf7ed570d07b970e00b99efb89f824df41 (diff) | |
download | rabbitmq-server-77bdb17b5e75e4b1b1c0ed8aa27b092b1d9d0f31.tar.gz |
refactor: re-order and re-name
-rw-r--r-- | src/rabbit_queue_consumers.erl | 129 |
1 files changed, 64 insertions, 65 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f94bd46c..75093cee 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,12 +16,11 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, - unacknowledged_message_count/0, erase_ch/2, deliver/5, - add/9, remove/3, send_drained/0, record_ack/3, subtract_acks/2, +-export([new/0, max_active_priority/1, inactive/1, count/0, all/1, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + send_drained/0, deliver/5, record_ack/3, subtract_acks/2, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, - count/0, all/1]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]). %%---------------------------------------------------------------------------- @@ -50,23 +49,22 @@ max_active_priority(Consumers) -> priority_queue:highest(Consumers). inactive(Consumers) -> priority_queue:is_empty(Consumers). +count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + +all(Consumers) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(Consumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> + priority_queue:fold( + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] + end, Acc, Consumers). + unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -erase_ch(ChPid, Consumers) -> - case lookup_ch(ChPid) of - not_found -> - not_found; - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - blocked_consumers = BlockedQ} -> - AllConsumers = priority_queue:join(Consumers, BlockedQ), - ok = erase_ch_record(C), - {queue:to_list(ChAckTags), - tags(priority_queue:to_list(AllConsumers)), - remove_consumers(ChPid, Consumers)} - end. - add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, Drained, Consumers) -> C = #cr{consumer_count = Count, @@ -110,32 +108,21 @@ remove(ChPid, ConsumerTag, Consumers) -> remove_consumer(ChPid, ConsumerTag, Consumers) end. -send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()]. - -record_ack(ChPid, LimiterPid, AckTag) -> - C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), - ok. - -subtract_acks(ChPid, AckTags) -> +erase_ch(ChPid, Consumers) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - ok + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = BlockedQ} -> + AllConsumers = priority_queue:join(Consumers, BlockedQ), + ok = erase_ch_record(C), + {queue:to_list(ChAckTags), + tags(priority_queue:to_list(AllConsumers)), + remove_consumers(ChPid, Consumers)} end. -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> - case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) - end. +send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()]. deliver(DeliverFun, Stop, QName, S, Consumers) -> deliver(DeliverFun, Stop, QName, [], S, Consumers). @@ -148,12 +135,13 @@ deliver( DeliverFun, false, QName, Blocked, S, Consumers) -> {false, Blocked, S, Consumers}; {{value, QEntry, Priority}, Tail} -> {Stop, Blocked1, S1, Consumers1} = - deliver1(DeliverFun, QEntry, Priority, QName, Blocked, S, Tail), + deliver_to_consumer(DeliverFun, QEntry, Priority, QName, + Blocked, S, Tail), deliver(DeliverFun, Stop, QName, Blocked1, S1, Consumers1) end. -deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> +deliver_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, QName, + Blocked, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), @@ -167,7 +155,7 @@ deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName, Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], {false, Blocked1, S, Consumers}; {continue, Limiter} -> - {Stop, S1} = deliver1( + {Stop, S1} = deliver_to_consumer( DeliverFun, Consumer, C#cr{limiter = Limiter}, QName, S), {Stop, Blocked, S1, @@ -175,13 +163,13 @@ deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName, end end. -deliver1(DeliverFun, - #consumer{tag = ConsumerTag, - ack_required = AckRequired}, - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - unsent_message_count = Count}, - QName, S) -> +deliver_to_consumer(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + QName, S) -> {{Message, IsDelivered, AckTag}, Stop, S1} = DeliverFun(AckRequired, S), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), @@ -193,6 +181,31 @@ deliver1(DeliverFun, unsent_message_count = Count + 1}), {Stop, S1}. +record_ack(ChPid, LimiterPid, AckTag) -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), + update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + ok. + +subtract_acks(ChPid, AckTags) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), + ok + end. + +subtract_acks([], [], AckQ) -> + AckQ; +subtract_acks([], Prefix, AckQ) -> + queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); +subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> + case queue:out(AckQ) of + {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + end. + possibly_unblock(Update, ChPid, Consumers) -> case lookup_ch(ChPid) of not_found -> unchanged; @@ -246,20 +259,6 @@ credit_fun(IsEmpty, Credit, Drain, CTag) -> end end. -count() -> - lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). - -all(Consumers) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(Consumers, []), all_ch_record()). - -consumers(Consumers, Acc) -> - priority_queue:fold( - fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] - end, Acc, Consumers). - %%---------------------------------------------------------------------------- lookup_ch(ChPid) -> |