summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-06 18:07:07 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-06 18:07:07 +0100
commit7321cd2ffca115b78cd6acb7f6548e64a7e4e8a9 (patch)
treea3efaf542ffaa386720b65a282afa902f000cc37
parent72c74753173d556c8bdcdad2a0634342094d34e1 (diff)
downloadrabbitmq-server-7321cd2ffca115b78cd6acb7f6548e64a7e4e8a9.tar.gz
Refactoring and cosmetic
-rw-r--r--src/rabbit_variable_queue.erl62
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 72fa4aeb..608e2dcd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -526,27 +526,26 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
pending_ack = PA1 })}.
dropwhile(Pred, State) ->
- case internal_queue_out(
- fun(MsgStatus = #msg_status { msg_properties = MsgProps },
- State1) ->
- case Pred(MsgProps) of
- true ->
- {_, State2} = internal_fetch(false,
- MsgStatus, State1),
- dropwhile(Pred, State2);
- false ->
- %% message needs to go back into Q4 (or
- %% maybe go in for the first time if it was
- %% loaded from Q3). Also the msg contents
- %% might not be in RAM, so read them in now
- {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
- read_msg(MsgStatus, State1),
- State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)}
- end
- end, State) of
- {empty, StateR} -> StateR;
- StateR -> StateR
- end.
+ {_OkOrEmpty, State1} = dropwhile1(Pred, State),
+ State1.
+
+dropwhile1(Pred, State) ->
+ internal_queue_out(
+ fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) ->
+ case Pred(MsgProps) of
+ true ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile1(Pred, State2);
+ false ->
+ %% message needs to go back into Q4 (or maybe go
+ %% in for the first time if it was loaded from
+ %% Q3). Also the msg contents might not be in
+ %% RAM, so read them in now
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ read_msg(MsgStatus, State1),
+ {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }}
+ end
+ end, State).
fetch(AckRequired, State) ->
internal_queue_out(
@@ -589,14 +588,14 @@ read_msg(MsgStatus, State) ->
{MsgStatus, State}.
internal_fetch(AckRequired,
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
- State = #vqstate {
- ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len, persistent_count = PCount,
- pending_ack = PA }) ->
+ MsgStatus = #msg_status {
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State = #vqstate {
+ ram_msg_count = RamMsgCount, out_counter = OutCount,
+ index_state = IndexState, len = Len, persistent_count = PCount,
+ pending_ack = PA }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -625,11 +624,8 @@ internal_fetch(AckRequired,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
+ RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- RamMsgCount1 = case Msg =:= undefined of
- true -> RamMsgCount;
- false -> RamMsgCount - 1
- end,
{{Msg, IsDelivered, AckTag, Len1},
a(State #vqstate { ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,