diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 18:07:07 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 18:07:07 +0100 |
commit | 7321cd2ffca115b78cd6acb7f6548e64a7e4e8a9 (patch) | |
tree | a3efaf542ffaa386720b65a282afa902f000cc37 | |
parent | 72c74753173d556c8bdcdad2a0634342094d34e1 (diff) | |
download | rabbitmq-server-7321cd2ffca115b78cd6acb7f6548e64a7e4e8a9.tar.gz |
Refactoring and cosmetic
-rw-r--r-- | src/rabbit_variable_queue.erl | 62 |
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 72fa4aeb..608e2dcd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -526,27 +526,26 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA1 })}. dropwhile(Pred, State) -> - case internal_queue_out( - fun(MsgStatus = #msg_status { msg_properties = MsgProps }, - State1) -> - case Pred(MsgProps) of - true -> - {_, State2} = internal_fetch(false, - MsgStatus, State1), - dropwhile(Pred, State2); - false -> - %% message needs to go back into Q4 (or - %% maybe go in for the first time if it was - %% loaded from Q3). Also the msg contents - %% might not be in RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = - read_msg(MsgStatus, State1), - State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} - end - end, State) of - {empty, StateR} -> StateR; - StateR -> StateR - end. + {_OkOrEmpty, State1} = dropwhile1(Pred, State), + State1. + +dropwhile1(Pred, State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> + case Pred(MsgProps) of + true -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile1(Pred, State2); + false -> + %% message needs to go back into Q4 (or maybe go + %% in for the first time if it was loaded from + %% Q3). Also the msg contents might not be in + %% RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} + end + end, State). fetch(AckRequired, State) -> internal_queue_out( @@ -589,14 +588,14 @@ read_msg(MsgStatus, State) -> {MsgStatus, State}. internal_fetch(AckRequired, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, - pending_ack = PA }) -> + MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, + pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -625,11 +624,8 @@ internal_fetch(AckRequired, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - RamMsgCount1 = case Msg =:= undefined of - true -> RamMsgCount; - false -> RamMsgCount - 1 - end, {{Msg, IsDelivered, AckTag, Len1}, a(State #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, |