diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 09:42:20 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 09:42:20 +0000 |
commit | a10df566f0632480665385b023ae6486d5e1f723 (patch) | |
tree | 658b029aa844c04b7e5e322b1adec540f2a24282 /src/rabbit_queue_consumers.erl | |
parent | 6d0b781e9ddb807c037151796ddd9f294b4b0bf7 (diff) | |
parent | 880b5d98c0b89a05a0b7349dca0f014bead8e071 (diff) | |
download | rabbitmq-server-a10df566f0632480665385b023ae6486d5e1f723.tar.gz |
merge bug25937 into bug25948
and largely rewrite it in the process
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 23 |
1 files changed, 10 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0e1122f9..f06423f7 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -78,13 +78,12 @@ -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), rabbit_amqqueue:name(), state()) -> - {'delivered', [{ch(), rabbit_types:ctag()}], T, state()} | - {'undelivered', [{ch(), rabbit_types:ctag()}], state()}. + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | - {'unblocked', [rabbit_types:ctag()], state()}. + 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -182,23 +181,22 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State) -> - deliver(FetchFun, QName, [], State). +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). -deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) -> +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {undelivered, Blocked, + {undelivered, ConsumersChanged, State#state{use = update_use(State#state.use, inactive)}}; - {{value, QEntry = {ChPid, Consumer}, Priority}, Tail} -> + {{value, QEntry, Priority}, Tail} -> case deliver_to_consumer(FetchFun, QEntry, QName) of {delivered, R} -> - {delivered, Blocked, R, + {delivered, ConsumersChanged, R, State#state{consumers = priority_queue:in(QEntry, Priority, Tail)}}; undelivered -> - deliver(FetchFun, QName, - [{ChPid, Consumer#consumer.tag} | Blocked], + deliver(FetchFun, QName, true, State#state{consumers = Tail}) end end. @@ -289,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, - tags(Unblocked), State#state{consumers = priority_queue:join(Consumers, UnblockedQ), use = update_use(Use, active)}} end. |