summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 14:10:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 14:10:17 +0100
commitb2722146d4e3eb4b80f952a2dc01f631e78a3e2d (patch)
treeaba19b9927ccf5456b2e3fa34bf24518f42ef882
parent67af217a39218ee8cf5e344786e8166c821adcdb (diff)
downloadrabbitmq-server-b2722146d4e3eb4b80f952a2dc01f631e78a3e2d.tar.gz
adjusted HO-ness in deliver queue beautifully. Thus in the deliver_from_queue case, we now reduce n calls to mixed_queue:is_empty to 1 call and pass around the remaining count as the acc. l33t
-rw-r--r--src/rabbit_amqqueue_process.erl87
1 files changed, 45 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 620b497b..0ab44a53 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -170,7 +170,7 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_queue(Fun, FunAcc0,
+deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
@@ -182,12 +182,12 @@ deliver_queue(Fun, FunAcc0,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- IsMsgReady = Fun(is_message_ready, FunAcc0, State),
+ IsMsgReady = PredFun(FunAcc0, State),
case (IsMsgReady andalso
rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
- {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} =
- Fun(AckRequired, FunAcc0, State),
+ {{Msg, IsDelivered, AckTag}, FunAcc1, State2} =
+ DeliverFun(AckRequired, FunAcc0, State),
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -217,10 +217,7 @@ deliver_queue(Fun, FunAcc0,
blocked_consumers = NewBlockedConsumers,
next_msg_id = NextId + 1
},
- case Remaining of
- 0 -> {FunAcc1, State3};
- _ -> deliver_queue(Fun, FunAcc1, State3)
- end;
+ deliver_queue(Funs, FunAcc1, State3);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
@@ -229,7 +226,7 @@ deliver_queue(Fun, FunAcc0,
ActiveConsumers,
BlockedConsumers),
deliver_queue(
- Fun, FunAcc0,
+ Funs, FunAcc0,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers});
false ->
@@ -240,30 +237,35 @@ deliver_queue(Fun, FunAcc0,
{FunAcc0, State}
end.
-deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) ->
- not rabbit_mixed_queue:is_empty(MS);
-deliver_from_queue(AckRequired, Acc = undefined,
- State = #q { mixed_state = MS }) ->
+deliver_from_queue_pred(IsEmpty, _State) ->
+ not IsEmpty.
+deliver_from_queue_deliver(AckRequired, false,
+ State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
- MS3 = case {Res, AckRequired} of
- {_, true} -> MS2;
- {empty, _} -> MS2;
- {{_Msg, _IsDelivered, AckTag, _Remaining}, false} ->
- {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2),
- MS4
- end,
- {Res, Acc, State #q { mixed_state = MS3 }}.
-
-run_message_queue(State) ->
- {undefined, State2} =
- deliver_queue(fun deliver_from_queue/3, undefined, State),
+ {Res2, MS3, IsEmpty} =
+ case Res of
+ empty -> {empty, MS2, true};
+ {Msg, IsDelivered, AckTag, Remaining} ->
+ {ok, MS4} = case AckRequired of
+ true -> {ok, MS2};
+ false -> rabbit_mixed_queue:ack([AckTag], MS2)
+ end,
+ {{Msg, IsDelivered, AckTag}, MS4, 0 == Remaining}
+ end,
+ {Res2, IsEmpty, State #q { mixed_state = MS3 }}.
+
+run_message_queue(State = #q { mixed_state = MS }) ->
+ Funs = { fun deliver_from_queue_pred/2,
+ fun deliver_from_queue_deliver/3 },
+ IsEmpty = rabbit_mixed_queue:is_empty(MS),
+ {_IsEmpty2, State2} =
+ deliver_queue(Funs, IsEmpty, State),
State2.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
- Fun =
- fun (is_message_ready, false, _State) ->
- true;
- (AckRequired, false, State2) ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false, State2) ->
{AckTag, State3} =
case AckRequired of
true ->
@@ -274,9 +276,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
false ->
{noack, State2}
end,
- {{Msg, false, AckTag, 0}, true, State3}
+ {{Msg, false, AckTag}, true, State3}
end,
- deliver_queue(Fun, false, State);
+ deliver_queue({ PredFun, DeliverFun }, false, State);
attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
{ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
record_pending_message(Txn, ChPid, Msg),
@@ -297,9 +299,11 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) ->
deliver_or_requeue_n([], State) ->
run_message_queue(State);
deliver_or_requeue_n(MsgsWithAcks, State) ->
+ Funs = { fun deliver_or_requeue_msgs_pred/2,
+ fun deliver_or_requeue_msgs_deliver/3 },
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
- deliver_queue(fun deliver_or_requeue_msgs/3,
- {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
+ deliver_queue(Funs, {length(MsgsWithAcks) - 1, [], MsgsWithAcks},
+ State),
{ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks),
NewState #q.mixed_state),
case OutstandingMsgs of
@@ -308,15 +312,14 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
NewState #q { mixed_state = MS2 }
end.
-deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks},
- _State) ->
- -1 < Len;
-deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]},
- State) ->
- {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
-deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]},
- State) ->
- {{Msg, true, AckTag, Len}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
+deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+ -1 < Len.
+deliver_or_requeue_msgs_deliver(
+ false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
+deliver_or_requeue_msgs_deliver(
+ true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).