diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-06 16:54:08 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-06 16:54:08 +0000 |
commit | c82c5d3168489bd7c9f72eece48578a7c9d3a270 (patch) | |
tree | 0c0a8891480674960e92084c013d0a24fc6e369b /src/rabbit_queue_consumers.erl | |
parent | dc767c96d9150a143d15bf0dbf5cd836596ccde5 (diff) | |
download | rabbitmq-server-c82c5d3168489bd7c9f72eece48578a7c9d3a270.tar.gz |
refactor: prepare for more state
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 81 |
1 files changed, 45 insertions, 36 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index d5d2a150..40daec32 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -26,6 +26,8 @@ -define(UNSENT_MESSAGE_LIMIT, 200). +-record(state, {consumers}). + -record(consumer, {tag, ack_required, args}). %% These are held in our process dictionary @@ -45,37 +47,37 @@ -ifdef(use_specs). --type consumers() :: priority_queue:q(). +-type state() :: #state{consumers ::priority_queue:q()}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. --spec new() -> consumers(). --spec max_active_priority(consumers()) -> integer() | 'infinity' | 'empty'. --spec inactive(consumers()) -> boolean(). --spec all(consumers()) -> [{ch(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]. +-spec new() -> state(). +-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. +-spec inactive(state()) -> boolean(). +-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), credit_args(), rabbit_framing:amqp_table(), boolean(), - consumers()) -> consumers(). --spec remove(ch(), rabbit_types:ctag(), consumers()) -> - 'not_found' | consumers(). --spec erase_ch(ch(), consumers()) -> + state()) -> state(). +-spec remove(ch(), rabbit_types:ctag(), state()) -> + 'not_found' | state(). +-spec erase_ch(ch(), state()) -> 'not_found' | {[ack()], [rabbit_types:ctag()], - consumers()}. + state()}. -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), - boolean(), rabbit_amqqueue:name(), T, consumers()) -> - {boolean(), [{ch(), rabbit_types:ctag()}], T, consumers()}. + boolean(), rabbit_amqqueue:name(), T, state()) -> + {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. --spec possibly_unblock(cr_fun(), ch(), consumers()) -> +-spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | - {'unblocked', [rabbit_types:ctag()], consumers()}. + {'unblocked', [rabbit_types:ctag()], state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -86,13 +88,15 @@ %%---------------------------------------------------------------------------- -new() -> priority_queue:new(). +new() -> #state{consumers = priority_queue:new()}. -max_active_priority(Consumers) -> priority_queue:highest(Consumers). +max_active_priority(#state{consumers = Consumers}) -> + priority_queue:highest(Consumers). -inactive(Consumers) -> priority_queue:is_empty(Consumers). +inactive(#state{consumers = Consumers}) -> + priority_queue:is_empty(Consumers). -all(Consumers) -> +all(#state{consumers = Consumers}) -> lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, consumers(Consumers, []), all_ch_record()). @@ -109,7 +113,7 @@ unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, - Drained, Consumers) -> + Drained, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -130,9 +134,9 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, args = OtherArgs}, - add_consumer({ChPid, Consumer}, Consumers). + State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. -remove(ChPid, ConsumerTag, Consumers) -> +remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; @@ -148,10 +152,11 @@ remove(ChPid, ConsumerTag, Consumers) -> update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), - remove_consumer(ChPid, ConsumerTag, Consumers) + State#state{consumers = + remove_consumer(ChPid, ConsumerTag, Consumers)} end. -erase_ch(ChPid, Consumers) -> +erase_ch(ChPid, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; @@ -162,26 +167,28 @@ erase_ch(ChPid, Consumers) -> ok = erase_ch_record(C), {queue:to_list(ChAckTags), tags(priority_queue:to_list(AllConsumers)), - remove_consumers(ChPid, Consumers)} + State#state{consumers = remove_consumers(ChPid, Consumers)}} end. send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, Stop, QName, S, Consumers) -> - deliver(FetchFun, Stop, QName, [], S, Consumers). +deliver(FetchFun, Stop, QName, S, State) -> + deliver(FetchFun, Stop, QName, [], S, State). -deliver(_FetchFun, true, _QName, Blocked, S, Consumers) -> - {true, Blocked, S, Consumers}; -deliver( FetchFun, false, QName, Blocked, S, Consumers) -> +deliver(_FetchFun, true, _QName, Blocked, S, State) -> + {true, Blocked, S, State}; +deliver( FetchFun, false, QName, Blocked, S, State = #state{ + consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, Consumers}; + {false, Blocked, S, State}; {{value, QEntry, Priority}, Tail} -> {Stop, Blocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, Consumers1) + deliver(FetchFun, Stop, QName, Blocked1, S1, + State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, @@ -250,18 +257,19 @@ subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) end. -possibly_unblock(Update, ChPid, Consumers) -> +possibly_unblock(Update, ChPid, State) -> case lookup_ch(ChPid) of not_found -> unchanged; C -> C1 = Update(C), case is_ch_blocked(C) andalso not is_ch_blocked(C1) of false -> update_ch_record(C1), unchanged; - true -> unblock(C1, Consumers) + true -> unblock(C1, State) end end. -unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) -> +unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, + State = #state{consumers = Consumers}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -275,7 +283,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) -> update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, tags(Unblocked), - priority_queue:join(Consumers, UnblockedQ)} + State#state{consumers = + priority_queue:join(Consumers, UnblockedQ)}} end. resume_fun() -> |