summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-08 09:42:20 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-08 09:42:20 +0000
commita10df566f0632480665385b023ae6486d5e1f723 (patch)
tree658b029aa844c04b7e5e322b1adec540f2a24282 /src/rabbit_queue_consumers.erl
parent6d0b781e9ddb807c037151796ddd9f294b4b0bf7 (diff)
parent880b5d98c0b89a05a0b7349dca0f014bead8e071 (diff)
downloadrabbitmq-server-a10df566f0632480665385b023ae6486d5e1f723.tar.gz
merge bug25937 into bug25948
and largely rewrite it in the process
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl23
1 files changed, 10 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 0e1122f9..f06423f7 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -78,13 +78,12 @@
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state()) ->
- {'delivered', [{ch(), rabbit_types:ctag()}], T, state()} |
- {'undelivered', [{ch(), rabbit_types:ctag()}], state()}.
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -182,23 +181,22 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State) ->
- deliver(FetchFun, QName, [], State).
+deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) ->
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {undelivered, Blocked,
+ {undelivered, ConsumersChanged,
State#state{use = update_use(State#state.use, inactive)}};
- {{value, QEntry = {ChPid, Consumer}, Priority}, Tail} ->
+ {{value, QEntry, Priority}, Tail} ->
case deliver_to_consumer(FetchFun, QEntry, QName) of
{delivered, R} ->
- {delivered, Blocked, R,
+ {delivered, ConsumersChanged, R,
State#state{consumers = priority_queue:in(QEntry, Priority,
Tail)}};
undelivered ->
- deliver(FetchFun, QName,
- [{ChPid, Consumer#consumer.tag} | Blocked],
+ deliver(FetchFun, QName, true,
State#state{consumers = Tail})
end
end.
@@ -289,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.