diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-08 08:58:50 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-08 08:58:50 +0000 |
commit | 18f4d00fedb9af94aa520341ced1e07a11addd85 (patch) | |
tree | 369fc274d7a9af8c1235a166178c39cc95875316 | |
parent | 8e61223c2e104ba2813aacf462e8406d8e7e66fa (diff) | |
download | rabbitmq-server-18f4d00fedb9af94aa520341ced1e07a11addd85.tar.gz |
refactor: remove cruft
The equivalence proof is left as an exercise to the reader.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 |
1 files changed, 24 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c4921510..5629b836 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) -> {_, _} -> ok end. -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, +deliver_msgs_to_consumers(_DeliverFun, true, State) -> + {true, State}; +deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case PredFun(FunAcc, State) of - false -> {FunAcc, State}; - true -> case queue:out(ActiveConsumers) of - {empty, _} -> - {FunAcc, State}; - {{value, QEntry}, Tail} -> - {FunAcc1, State1} = - deliver_msg_to_consumer( + case queue:out(ActiveConsumers) of + {empty, _} -> + {false, State}; + {{value, QEntry}, Tail} -> + {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, - FunAcc, State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(Funs, FunAcc1, State1) - end + State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {FunAcc, State}; + {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, self(), Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), - {FunAcc, State}; + {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, FunAcc, + DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end. @@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - FunAcc, State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), + State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, Stop, State1} = + DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {FunAcc1, State1}. - -deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. + {Stop, State1}. -deliver_from_queue_deliver(AckRequired, false, State) -> +deliver_from_queue_deliver(AckRequired, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. @@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) -> State. run_message_queue(State) -> - Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3}, State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), - IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + {_IsEmpty1, State2} = deliver_msgs_to_consumers( + fun deliver_from_queue_deliver/2, + BQ:is_empty(BQS), State1), State2. attempt_delivery(Delivery = #delivery{sender = ChPid, @@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, - State1 = #q{backing_queue_state = BQS2}) -> + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> %% we don't need an expiry here because %% messages are not being enqueued, so we use %% an empty message_properties. @@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State1#q{backing_queue_state = BQS3}} end, {Delivered, State2} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + deliver_msgs_to_consumers(DeliverFun, false, State#q{backing_queue_state = BQS1}), {Delivered, Confirm, State2}; {Duplicate, BQS1} -> |