diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 13:16:58 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 13:16:58 +0000 |
commit | 880b5d98c0b89a05a0b7349dca0f014bead8e071 (patch) | |
tree | a59631a98bf5ff0b99bb431b4d151edfbbef8e72 | |
parent | d0a11dbfd0074236eba28a0ae37b18cb32832d92 (diff) | |
download | rabbitmq-server-880b5d98c0b89a05a0b7349dca0f014bead8e071.tar.gz |
Even clearer?
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 28 |
2 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1b06dac3..8bc50bf7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -405,13 +405,13 @@ maybe_send_drained(WasEmpty, State) -> State. deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> - {Active, Blocked, State1, Consumers1} = + {Active, ACResult, State1, Consumers1} = rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, Consumers), State2 = State1#q{consumers = Consumers1}, - case Blocked of - something_became_blocked -> notify_decorators(State2); - nothing_became_blocked -> ok + case ACResult of + active_consumers_changed -> notify_decorators(State2); + active_consumers_unchanged -> ok end, {Active, State2}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 818a3087..0d65c48f 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -59,7 +59,7 @@ -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. --type blocked_result() :: 'nothing_became_blocked' | 'something_became_blocked'. +-type ac_result() :: 'active_consumers_unchanged' | 'active_consumers_changed'. -spec new() -> state(). -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. @@ -79,7 +79,7 @@ -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), boolean(), rabbit_amqqueue:name(), T, state()) - -> {blocked_result(), [{ch(), rabbit_types:ctag()}], T, state()}. + -> {ac_result(), [{ch(), rabbit_types:ctag()}], T, state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> @@ -182,40 +182,40 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, nothing_became_blocked, S, State). + deliver(FetchFun, Stop, QName, active_consumers_unchanged, S, State). -deliver(_FetchFun, true, _QName, Blocked, S, State) -> - {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, +deliver(_FetchFun, true, _QName, ACResult, S, State) -> + {true, ACResult, S, State}; +deliver( FetchFun, false, QName, ACResult, S, State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; + {false, ACResult, S, State#state{use = update_use(Use, inactive)}}; {{value, QEntry, Priority}, Tail} -> - {Stop, Blocked1, S1, Consumers1} = + {Stop, ACResult1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, - Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, + ACResult, S, Tail), + deliver(FetchFun, Stop, QName, ACResult1, S1, State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> + ACResult, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {false, something_became_blocked, S, Consumers}; + {false, active_consumers_changed, S, Consumers}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - {false, something_became_blocked, S, Consumers}; + {false, active_consumers_changed, S, Consumers}; {continue, Limiter} -> {Stop, S1} = deliver_to_consumer( FetchFun, Consumer, C#cr{limiter = Limiter}, QName, S), - {Stop, Blocked, S1, + {Stop, ACResult, S1, priority_queue:in(E, Priority, Consumers)} end end. |