diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-07 20:54:13 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-07 20:54:13 +0000 |
commit | caa0db32425f5292382db52f23d7007a76aabf29 (patch) | |
tree | 1c2bcb3be9e18b400294a93999b3f814819fed32 | |
parent | 5ae1074655a4afab7c14a719e74bca06929a0464 (diff) | |
download | rabbitmq-server-caa0db32425f5292382db52f23d7007a76aabf29.tar.gz |
simplify rabbit_queue_consumers:deliver even further
turns out we can ditch another parameter to both that function and the
FetchFun we pass on.
Also reverted state shrinking in attempt_delivery call site. While it
is the right thing in principle, I am not convinced the resulting code
is actually easier to grok.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 30 |
2 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b5922bb6..dd2a90e3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -458,12 +458,11 @@ run_message_queue(Blocked, State = #q{consumers = Consumers}) -> case is_empty(State) of true -> blocked(lists:append(Blocked), Consumers, State); false -> case rabbit_queue_consumers:deliver( - fun(AckRequired, State1) -> - fetch(AckRequired, State1) - end, qname(State), State, Consumers) of - {delivered, MoreBlocked, State2, Consumers1} -> + fun(AckRequired) -> fetch(AckRequired, State) end, + qname(State), Consumers) of + {delivered, MoreBlocked, State1, Consumers1} -> run_message_queue([MoreBlocked | Blocked], - State2#q{consumers = Consumers1}); + State1#q{consumers = Consumers1}); {undelivered, MoreBlocked, Consumers1} -> blocked(lists:append([MoreBlocked | Blocked]), Consumers1, State) @@ -474,19 +473,16 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case rabbit_queue_consumers:deliver( - fun (true, BQS1) -> - true = BQ:is_empty(BQS1), - {AckTag, BQS2} = BQ:publish_delivered( - Message, Props, SenderPid, BQS1), - {{Message, Delivered, AckTag}, {ack, BQS2}}; - (false, _BQS) -> - {{Message, Delivered, undefined}, noack} - end, qname(State), BQS, State#q.consumers) of - {delivered, Blocked, {ack, BQS3}, Consumers} -> - {delivered, blocked(Blocked, Consumers, - State#q{backing_queue_state = BQS3})}; - {delivered, Blocked, noack, Consumers} -> - {delivered, discard(Delivery, blocked(Blocked, Consumers, State))}; + fun (true) -> true = BQ:is_empty(BQS), + {AckTag, BQS1} = BQ:publish_delivered( + Message, Props, SenderPid, BQS), + {{Message, Delivered, AckTag}, + State#q{backing_queue_state = BQS1}}; + (false) -> {{Message, Delivered, undefined}, + discard(Delivery, State)} + end, qname(State), State#q.consumers) of + {delivered, Blocked, State1, Consumers} -> + {delivered, blocked(Blocked, Consumers, State1)}; {undelivered, Blocked, Consumers} -> {undelivered, blocked(Blocked, Consumers, State)} end. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 38b1bad9..0e1122f9 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/4, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -76,8 +76,8 @@ 'not_found' | {[ack()], [rabbit_types:ctag()], state()}. -spec send_drained() -> 'ok'. --spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), - rabbit_amqqueue:name(), T, state()) -> +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state()) -> {'delivered', [{ch(), rabbit_types:ctag()}], T, state()} | {'undelivered', [{ch(), rabbit_types:ctag()}], state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. @@ -182,28 +182,28 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, S, State) -> - deliver(FetchFun, QName, [], S, State). +deliver(FetchFun, QName, State) -> + deliver(FetchFun, QName, [], State). -deliver(FetchFun, QName, Blocked, S, State = #state{consumers = Consumers}) -> +deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> {undelivered, Blocked, State#state{use = update_use(State#state.use, inactive)}}; {{value, QEntry = {ChPid, Consumer}, Priority}, Tail} -> - case deliver_to_consumer(FetchFun, QEntry, QName, S) of - {delivered, S1} -> - {delivered, Blocked, S1, + case deliver_to_consumer(FetchFun, QEntry, QName) of + {delivered, R} -> + {delivered, Blocked, R, State#state{consumers = priority_queue:in(QEntry, Priority, Tail)}}; undelivered -> deliver(FetchFun, QName, - [{ChPid, Consumer#consumer.tag} | Blocked], S, + [{ChPid, Consumer#consumer.tag} | Blocked], State#state{consumers = Tail}) end end. -deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) -> +deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), @@ -217,7 +217,7 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName, S) -> {continue, Limiter} -> {delivered, deliver_to_consumer( FetchFun, Consumer, - C#cr{limiter = Limiter}, QName, S)} + C#cr{limiter = Limiter}, QName)} end end. @@ -227,8 +227,8 @@ deliver_to_consumer(FetchFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - QName, S) -> - {{Message, IsDelivered, AckTag}, S1} = FetchFun(AckRequired, S), + QName) -> + {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -237,7 +237,7 @@ deliver_to_consumer(FetchFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - S1. + R. record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), |