summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-07 20:54:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-07 20:54:13 +0000
commitcaa0db32425f5292382db52f23d7007a76aabf29 (patch)
tree1c2bcb3be9e18b400294a93999b3f814819fed32 /src/rabbit_queue_consumers.erl
parent5ae1074655a4afab7c14a719e74bca06929a0464 (diff)
downloadrabbitmq-server-caa0db32425f5292382db52f23d7007a76aabf29.tar.gz
simplify rabbit_queue_consumers:deliver even further
turns out we can ditch another parameter to both that function and the FetchFun we pass on. Also reverted state shrinking in attempt_delivery call site. While it is the right thing in principle, I am not convinced the resulting code is actually easier to grok.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 38b1bad9..0e1122f9 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/4, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -76,8 +76,8 @@
'not_found' | {[ack()], [rabbit_types:ctag()],
state()}.
-spec send_drained() -> 'ok'.
--spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
- rabbit_amqqueue:name(), T, state()) ->
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state()) ->
{'delivered', [{ch(), rabbit_types:ctag()}], T, state()} |
{'undelivered', [{ch(), rabbit_types:ctag()}], state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@@ -182,28 +182,28 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, S, State) ->
- deliver(FetchFun, QName, [], S, State).
+deliver(FetchFun, QName, State) ->
+ deliver(FetchFun, QName, [], State).
-deliver(FetchFun, QName, Blocked, S, State = #state{consumers = Consumers}) ->
+deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, Blocked,
State#state{use = update_use(State#state.use, inactive)}};
{{value, QEntry = {ChPid, Consumer}, Priority}, Tail} ->
- case deliver_to_consumer(FetchFun, QEntry, QName, S) of
- {delivered, S1} ->
- {delivered, Blocked, S1,
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, Blocked, R,
State#state{consumers = priority_queue:in(QEntry, Priority,
Tail)}};
undelivered ->
deliver(FetchFun, QName,
- [{ChPid, Consumer#consumer.tag} | Blocked], S,
+ [{ChPid, Consumer#consumer.tag} | Blocked],
State#state{consumers = Tail})
end
end.
-deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) ->
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
@@ -217,7 +217,7 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) ->
{continue, Limiter} ->
{delivered, deliver_to_consumer(
FetchFun, Consumer,
- C#cr{limiter = Limiter}, QName, S)}
+ C#cr{limiter = Limiter}, QName)}
end
end.
@@ -227,8 +227,8 @@ deliver_to_consumer(FetchFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- QName, S) ->
- {{Message, IsDelivered, AckTag}, S1} = FetchFun(AckRequired, S),
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -237,7 +237,7 @@ deliver_to_consumer(FetchFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- S1.
+ R.
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),