summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-06-25 15:31:27 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-06-25 15:31:27 +0100
commit4a41e885f65c9db368166c8dae2777af6dc57993 (patch)
treec3111fab30d1384663fa11c2525b94026c154238 /src
parentdb26ed9e1f432756c4906b371f5ae9f88d5106ec (diff)
downloadrabbitmq-server-4a41e885f65c9db368166c8dae2777af6dc57993.tar.gz
cosmetic: move helpers to where they belong
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl176
1 files changed, 88 insertions, 88 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b8fbf140..e997bb61 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -572,18 +572,6 @@ dropwhile(Pred, State) ->
end
end.
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
- case queue:is_empty(Q4) of
- true -> State #vqstate {
- q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
- false -> {MsgStatus1, State1} = read_msg(MsgStatus, State),
- State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4) }
- end;
-in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
-
fetch(AckRequired, State) ->
case queue_out(State) of
{empty, State1} ->
@@ -596,82 +584,6 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
-queue_out(State = #vqstate { q4 = Q4 }) ->
- case queue:out(Q4) of
- {empty, _Q4} ->
- case fetch_from_q3(State) of
- {empty, _State1} = Result -> Result;
- {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
- end;
- {{value, MsgStatus}, Q4a} ->
- {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
- end.
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
- %% 1. Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% 2. Remove from msg_store and queue index, if necessary
- Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
- end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
- end,
-
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
- RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
-
- {{Msg, IsDelivered, AckTag, Len1},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 }}.
-
ack(AckTags, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
fun (_, State0) -> State0 end,
@@ -1145,6 +1057,94 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
+in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
+ State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+ case queue:is_empty(Q4) of
+ true -> State #vqstate {
+ q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+ false -> {MsgStatus1, State1} = read_msg(MsgStatus, State),
+ State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4) }
+ end;
+in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
+ State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+
+queue_out(State = #vqstate { q4 = Q4 }) ->
+ case queue:out(Q4) of
+ {empty, _Q4} ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end;
+ {{value, MsgStatus}, Q4a} ->
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end.
+
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
+
+internal_fetch(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ Rem = fun () ->
+ ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
+ end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, State1} = case AckRequired of
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+
+ {{Msg, IsDelivered, AckTag, Len1},
+ State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1 }}.
+
msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
AsyncCallback, SyncCallback) ->
case SyncCallback(?MODULE,