diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-25 15:31:27 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-25 15:31:27 +0100 |
commit | 4a41e885f65c9db368166c8dae2777af6dc57993 (patch) | |
tree | c3111fab30d1384663fa11c2525b94026c154238 /src | |
parent | db26ed9e1f432756c4906b371f5ae9f88d5106ec (diff) | |
download | rabbitmq-server-4a41e885f65c9db368166c8dae2777af6dc57993.tar.gz |
cosmetic: move helpers to where they belong
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_variable_queue.erl | 176 |
1 files changed, 88 insertions, 88 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b8fbf140..e997bb61 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -572,18 +572,6 @@ dropwhile(Pred, State) -> end end. -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - case queue:is_empty(Q4) of - true -> State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; - false -> {MsgStatus1, State1} = read_msg(MsgStatus, State), - State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4) } - end; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. - fetch(AckRequired, State) -> case queue_out(State) of {empty, State1} -> @@ -596,82 +584,6 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. -queue_out(State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end; - {{value, MsgStatus}, Q4a} -> - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} - end. - -read_msg(MsgStatus = #msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> - {MsgStatus, State}. - -internal_fetch(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount }) -> - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) - end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - - {{Msg, IsDelivered, AckTag, Len1}, - State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1 }}. - ack(AckTags, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (_, State0) -> State0 end, @@ -1145,6 +1057,94 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. +in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> + case queue:is_empty(Q4) of + true -> State #vqstate { + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + false -> {MsgStatus1, State1} = read_msg(MsgStatus, State), + State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4) } + end; +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + +queue_out(State = #vqstate { q4 = Q4 }) -> + case queue:out(Q4) of + {empty, _Q4} -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} + end; + {{value, MsgStatus}, Q4a} -> + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end. + +read_msg(MsgStatus = #msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 }}. + msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> case SyncCallback(?MODULE, |