summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:15:15 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:15:15 +0100
commit1da0d08db25ac9438397fe1f957f3a5b43c2d195 (patch)
treedaa22408e1faa7db4d698778c60d4840cab9735f
parent016416611c55123e4f3d0a8377858b236a2694a0 (diff)
downloadrabbitmq-server-1da0d08db25ac9438397fe1f957f3a5b43c2d195.tar.gz
inlining and reshuffling
-rw-r--r--src/rabbit_variable_queue.erl95
1 files changed, 45 insertions, 50 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ad47a7df..4b6b0811 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -571,9 +571,26 @@ ack(AckTags, State) ->
AckTags, State),
{MsgIds, a(State1)}.
-requeue(AckTags, MsgPropsFun, State) ->
- {MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State),
- {MsgIds, a(reduce_memory_use(State1))}.
+requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ q4_funs(MsgPropsFun), State),
+ {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
+ delta_limit(Delta),
+ q3_funs(MsgPropsFun), State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ MsgPropsFun, State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, a(reduce_memory_use(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }))}.
len(#vqstate { len = Len }) -> Len.
@@ -1314,53 +1331,6 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-requeue_merge(SeqIdsSorted, MsgPropsFun,
- #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
- {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, [],
- beta_limit(Q3),
- q4_funs(MsgPropsFun), State),
- {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, MsgIds,
- delta_limit(Delta),
- q3_funs(MsgPropsFun), State1),
- {Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1,
- MsgPropsFun, State2),
- MsgCount = length(MsgIds2),
- {MsgIds2, State3 #vqstate { delta = Delta1,
- q3 = Q3a,
- q4 = Q4a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount }}.
-
-%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
- State) ->
- queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
-
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
- #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
- State)
- when Limit == undefined orelse SeqId < Limit ->
- case QOut(Q) of
- {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
- when SeqIdQ < SeqId ->
- %% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
- Limit, Funs, State);
- {_, _Q1} ->
- %% enqueue from the remaining list of sequence ids
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- QPublish(SeqId, State),
- queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
- Limit, Funs, State1)
- end;
-queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
- State) ->
- {SeqIds, QJoin(Front, Q), MsgIds, State}.
-
q4_funs(MsgPropsFun) ->
#merge_funs {
new = fun queue:new/0,
@@ -1414,6 +1384,31 @@ q3_funs(MsgPropsFun) ->
one_if(not IndexOnDisk) }}
end}.
+%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
+ State) ->
+ queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
+
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
+ #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
+ State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case QOut(Q) of
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
+ Limit, Funs, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
+ QPublish(SeqId, State),
+ queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
+ Limit, Funs, State1)
+ end;
+queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
+ State) ->
+ {SeqIds, QJoin(Front, Q), MsgIds, State}.
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};