summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-07 20:54:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-07 20:54:13 +0000
commitcaa0db32425f5292382db52f23d7007a76aabf29 (patch)
tree1c2bcb3be9e18b400294a93999b3f814819fed32
parent5ae1074655a4afab7c14a719e74bca06929a0464 (diff)
downloadrabbitmq-server-caa0db32425f5292382db52f23d7007a76aabf29.tar.gz
simplify rabbit_queue_consumers:deliver even further
turns out we can ditch another parameter to both that function and the FetchFun we pass on. Also reverted state shrinking in attempt_delivery call site. While it is the right thing in principle, I am not convinced the resulting code is actually easier to grok.
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_queue_consumers.erl30
2 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b5922bb6..dd2a90e3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -458,12 +458,11 @@ run_message_queue(Blocked, State = #q{consumers = Consumers}) ->
case is_empty(State) of
true -> blocked(lists:append(Blocked), Consumers, State);
false -> case rabbit_queue_consumers:deliver(
- fun(AckRequired, State1) ->
- fetch(AckRequired, State1)
- end, qname(State), State, Consumers) of
- {delivered, MoreBlocked, State2, Consumers1} ->
+ fun(AckRequired) -> fetch(AckRequired, State) end,
+ qname(State), Consumers) of
+ {delivered, MoreBlocked, State1, Consumers1} ->
run_message_queue([MoreBlocked | Blocked],
- State2#q{consumers = Consumers1});
+ State1#q{consumers = Consumers1});
{undelivered, MoreBlocked, Consumers1} ->
blocked(lists:append([MoreBlocked | Blocked]),
Consumers1, State)
@@ -474,19 +473,16 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
case rabbit_queue_consumers:deliver(
- fun (true, BQS1) ->
- true = BQ:is_empty(BQS1),
- {AckTag, BQS2} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS1),
- {{Message, Delivered, AckTag}, {ack, BQS2}};
- (false, _BQS) ->
- {{Message, Delivered, undefined}, noack}
- end, qname(State), BQS, State#q.consumers) of
- {delivered, Blocked, {ack, BQS3}, Consumers} ->
- {delivered, blocked(Blocked, Consumers,
- State#q{backing_queue_state = BQS3})};
- {delivered, Blocked, noack, Consumers} ->
- {delivered, discard(Delivery, blocked(Blocked, Consumers, State))};
+ fun (true) -> true = BQ:is_empty(BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS),
+ {{Message, Delivered, AckTag},
+ State#q{backing_queue_state = BQS1}};
+ (false) -> {{Message, Delivered, undefined},
+ discard(Delivery, State)}
+ end, qname(State), State#q.consumers) of
+ {delivered, Blocked, State1, Consumers} ->
+ {delivered, blocked(Blocked, Consumers, State1)};
{undelivered, Blocked, Consumers} ->
{undelivered, blocked(Blocked, Consumers, State)}
end.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 38b1bad9..0e1122f9 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/4, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -76,8 +76,8 @@
'not_found' | {[ack()], [rabbit_types:ctag()],
state()}.
-spec send_drained() -> 'ok'.
--spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
- rabbit_amqqueue:name(), T, state()) ->
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state()) ->
{'delivered', [{ch(), rabbit_types:ctag()}], T, state()} |
{'undelivered', [{ch(), rabbit_types:ctag()}], state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@@ -182,28 +182,28 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, S, State) ->
- deliver(FetchFun, QName, [], S, State).
+deliver(FetchFun, QName, State) ->
+ deliver(FetchFun, QName, [], State).
-deliver(FetchFun, QName, Blocked, S, State = #state{consumers = Consumers}) ->
+deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, Blocked,
State#state{use = update_use(State#state.use, inactive)}};
{{value, QEntry = {ChPid, Consumer}, Priority}, Tail} ->
- case deliver_to_consumer(FetchFun, QEntry, QName, S) of
- {delivered, S1} ->
- {delivered, Blocked, S1,
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, Blocked, R,
State#state{consumers = priority_queue:in(QEntry, Priority,
Tail)}};
undelivered ->
deliver(FetchFun, QName,
- [{ChPid, Consumer#consumer.tag} | Blocked], S,
+ [{ChPid, Consumer#consumer.tag} | Blocked],
State#state{consumers = Tail})
end
end.
-deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) ->
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
@@ -217,7 +217,7 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) ->
{continue, Limiter} ->
{delivered, deliver_to_consumer(
FetchFun, Consumer,
- C#cr{limiter = Limiter}, QName, S)}
+ C#cr{limiter = Limiter}, QName)}
end
end.
@@ -227,8 +227,8 @@ deliver_to_consumer(FetchFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- QName, S) ->
- {{Message, IsDelivered, AckTag}, S1} = FetchFun(AckRequired, S),
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -237,7 +237,7 @@ deliver_to_consumer(FetchFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- S1.
+ R.
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),