diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 14:10:17 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 14:10:17 +0100 |
commit | b2722146d4e3eb4b80f952a2dc01f631e78a3e2d (patch) | |
tree | aba19b9927ccf5456b2e3fa34bf24518f42ef882 | |
parent | 67af217a39218ee8cf5e344786e8166c821adcdb (diff) | |
download | rabbitmq-server-b2722146d4e3eb4b80f952a2dc01f631e78a3e2d.tar.gz |
adjusted HO-ness in deliver queue beautifully. Thus in the deliver_from_queue case, we now reduce n calls to mixed_queue:is_empty to 1 call and pass around the remaining count as the acc. l33t
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 87 |
1 files changed, 45 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 620b497b..0ab44a53 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -170,7 +170,7 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_queue(Fun, FunAcc0, +deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, @@ -182,12 +182,12 @@ deliver_queue(Fun, FunAcc0, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - IsMsgReady = Fun(is_message_ready, FunAcc0, State), + IsMsgReady = PredFun(FunAcc0, State), case (IsMsgReady andalso rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> - {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} = - Fun(AckRequired, FunAcc0, State), + {{Msg, IsDelivered, AckTag}, FunAcc1, State2} = + DeliverFun(AckRequired, FunAcc0, State), ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -217,10 +217,7 @@ deliver_queue(Fun, FunAcc0, blocked_consumers = NewBlockedConsumers, next_msg_id = NextId + 1 }, - case Remaining of - 0 -> {FunAcc1, State3}; - _ -> deliver_queue(Fun, FunAcc1, State3) - end; + deliver_queue(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -229,7 +226,7 @@ deliver_queue(Fun, FunAcc0, ActiveConsumers, BlockedConsumers), deliver_queue( - Fun, FunAcc0, + Funs, FunAcc0, State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}); false -> @@ -240,30 +237,35 @@ deliver_queue(Fun, FunAcc0, {FunAcc0, State} end. -deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) -> - not rabbit_mixed_queue:is_empty(MS); -deliver_from_queue(AckRequired, Acc = undefined, - State = #q { mixed_state = MS }) -> +deliver_from_queue_pred(IsEmpty, _State) -> + not IsEmpty. +deliver_from_queue_deliver(AckRequired, false, + State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), - MS3 = case {Res, AckRequired} of - {_, true} -> MS2; - {empty, _} -> MS2; - {{_Msg, _IsDelivered, AckTag, _Remaining}, false} -> - {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2), - MS4 - end, - {Res, Acc, State #q { mixed_state = MS3 }}. - -run_message_queue(State) -> - {undefined, State2} = - deliver_queue(fun deliver_from_queue/3, undefined, State), + {Res2, MS3, IsEmpty} = + case Res of + empty -> {empty, MS2, true}; + {Msg, IsDelivered, AckTag, Remaining} -> + {ok, MS4} = case AckRequired of + true -> {ok, MS2}; + false -> rabbit_mixed_queue:ack([AckTag], MS2) + end, + {{Msg, IsDelivered, AckTag}, MS4, 0 == Remaining} + end, + {Res2, IsEmpty, State #q { mixed_state = MS3 }}. + +run_message_queue(State = #q { mixed_state = MS }) -> + Funs = { fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3 }, + IsEmpty = rabbit_mixed_queue:is_empty(MS), + {_IsEmpty2, State2} = + deliver_queue(Funs, IsEmpty, State), State2. attempt_immediate_delivery(none, _ChPid, Msg, State) -> - Fun = - fun (is_message_ready, false, _State) -> - true; - (AckRequired, false, State2) -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, State2) -> {AckTag, State3} = case AckRequired of true -> @@ -274,9 +276,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> false -> {noack, State2} end, - {{Msg, false, AckTag, 0}, true, State3} + {{Msg, false, AckTag}, true, State3} end, - deliver_queue(Fun, false, State); + deliver_queue({ PredFun, DeliverFun }, false, State); attempt_immediate_delivery(Txn, ChPid, Msg, State) -> {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), record_pending_message(Txn, ChPid, Msg), @@ -297,9 +299,11 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) -> deliver_or_requeue_n([], State) -> run_message_queue(State); deliver_or_requeue_n(MsgsWithAcks, State) -> + Funs = { fun deliver_or_requeue_msgs_pred/2, + fun deliver_or_requeue_msgs_deliver/3 }, {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = - deliver_queue(fun deliver_or_requeue_msgs/3, - {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), + deliver_queue(Funs, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, + State), {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state), case OutstandingMsgs of @@ -308,15 +312,14 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> NewState #q { mixed_state = MS2 } end. -deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, - _State) -> - -1 < Len; -deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, - State) -> - {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; -deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, - State) -> - {{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. +deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> + -1 < Len. +deliver_or_requeue_msgs_deliver( + false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, noack}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; +deliver_or_requeue_msgs_deliver( + true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). |