summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 13:16:58 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 13:16:58 +0000
commit880b5d98c0b89a05a0b7349dca0f014bead8e071 (patch)
treea59631a98bf5ff0b99bb431b4d151edfbbef8e72
parentd0a11dbfd0074236eba28a0ae37b18cb32832d92 (diff)
downloadrabbitmq-server-880b5d98c0b89a05a0b7349dca0f014bead8e071.tar.gz
Even clearer?
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_queue_consumers.erl28
2 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1b06dac3..8bc50bf7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -405,13 +405,13 @@ maybe_send_drained(WasEmpty, State) ->
State.
deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
- {Active, Blocked, State1, Consumers1} =
+ {Active, ACResult, State1, Consumers1} =
rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
Consumers),
State2 = State1#q{consumers = Consumers1},
- case Blocked of
- something_became_blocked -> notify_decorators(State2);
- nothing_became_blocked -> ok
+ case ACResult of
+ active_consumers_changed -> notify_decorators(State2);
+ active_consumers_unchanged -> ok
end,
{Active, State2}.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 818a3087..0d65c48f 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -59,7 +59,7 @@
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
-type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
--type blocked_result() :: 'nothing_became_blocked' | 'something_became_blocked'.
+-type ac_result() :: 'active_consumers_unchanged' | 'active_consumers_changed'.
-spec new() -> state().
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
@@ -79,7 +79,7 @@
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
boolean(), rabbit_amqqueue:name(), T, state())
- -> {blocked_result(), [{ch(), rabbit_types:ctag()}], T, state()}.
+ -> {ac_result(), [{ch(), rabbit_types:ctag()}], T, state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
@@ -182,40 +182,40 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, nothing_became_blocked, S, State).
+ deliver(FetchFun, Stop, QName, active_consumers_unchanged, S, State).
-deliver(_FetchFun, true, _QName, Blocked, S, State) ->
- {true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S,
+deliver(_FetchFun, true, _QName, ACResult, S, State) ->
+ {true, ACResult, S, State};
+deliver( FetchFun, false, QName, ACResult, S,
State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ {false, ACResult, S, State#state{use = update_use(Use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, Blocked1, S1, Consumers1} =
+ {Stop, ACResult1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1,
+ ACResult, S, Tail),
+ deliver(FetchFun, Stop, QName, ACResult1, S1,
State#state{consumers = Consumers1})
end.
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+ ACResult, S, Consumers) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {false, something_became_blocked, S, Consumers};
+ {false, active_consumers_changed, S, Consumers};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- {false, something_became_blocked, S, Consumers};
+ {false, active_consumers_changed, S, Consumers};
{continue, Limiter} ->
{Stop, S1} = deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName, S),
- {Stop, Blocked, S1,
+ {Stop, ACResult, S1,
priority_queue:in(E, Priority, Consumers)}
end
end.