diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-12 10:18:28 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-12 10:18:28 +0000 |
commit | 0c84e4f85fa92a384646af16ec9087696c65c139 (patch) | |
tree | a44f39ecf56a189fef6732da04a92a0026bd452b | |
parent | e2393df71b70a19895cd27cde9ead044930c83fa (diff) | |
download | rabbitmq-server-0c84e4f85fa92a384646af16ec9087696c65c139.tar.gz |
unmodalise vq:read_msg
-rw-r--r-- | src/rabbit_variable_queue.erl | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90ee3439..285dfcd7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -527,7 +527,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), @@ -538,12 +537,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 })). + a(reduce_memory_use( + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -596,7 +595,7 @@ fetchwhile(Pred, Fun, Acc, State) -> {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {Msg, State2} = read_msg(MsgStatus, false, State1), + true -> {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(true, MsgStatus, State2), fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} @@ -610,7 +609,7 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, false, State1), + {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(AckRequired, MsgStatus, State2), {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} end. @@ -672,7 +671,7 @@ ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = lists:foldl(fun(SeqId, {Acc0, State0}) -> MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, false, State0), + {Msg, State1} = read_msg(MsgStatus, State0), {MsgFun(Msg, SeqId, Acc0), State1} end, {Acc, State}, AckTags), {AccN, a(StateN)}. @@ -684,7 +683,7 @@ fold(Fun, Acc, #vqstate { q1 = Q1, q3 = Q3, q4 = Q4 } = State) -> QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, false, State0), + {Msg, State1} = read_msg(MsgStatus, State0), {StopGo, AccNext} = Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), {StopGo, {AccNext, State1}} @@ -1078,9 +1077,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, true, State), - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) } + read_msg(MsgStatus, State), + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1096,19 +1096,19 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. -read_msg(#msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> +read_msg(#msg_status{msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent}, + State = #vqstate{msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam), - {Msg, State #vqstate { ram_msg_count = RamMsgCount1, - msg_store_clients = MSCState1 }}; -read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) -> + {Msg, State #vqstate {msg_store_clients = MSCState1}}; +read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> + State#vqstate{ram_msg_count = RamMsgCount + 1}. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1390,10 +1390,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, true, State), - {MsgStatus#msg_status { msg = Msg }, State1}; -publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + {Msg, State1} = read_msg(MsgStatus, State), + {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; +publish_alpha(MsgStatus, State) -> + {MsgStatus, inc_ram_msg_count(State)}. publish_beta(MsgStatus, State) -> {#msg_status { msg = Msg} = MsgStatus1, |