diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:15:15 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:15:15 +0100 |
commit | 1da0d08db25ac9438397fe1f957f3a5b43c2d195 (patch) | |
tree | daa22408e1faa7db4d698778c60d4840cab9735f | |
parent | 016416611c55123e4f3d0a8377858b236a2694a0 (diff) | |
download | rabbitmq-server-1da0d08db25ac9438397fe1f957f3a5b43c2d195.tar.gz |
inlining and reshuffling
-rw-r--r-- | src/rabbit_variable_queue.erl | 95 |
1 files changed, 45 insertions, 50 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ad47a7df..4b6b0811 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -571,9 +571,26 @@ ack(AckTags, State) -> AckTags, State), {MsgIds, a(State1)}. -requeue(AckTags, MsgPropsFun, State) -> - {MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State), - {MsgIds, a(reduce_memory_use(State1))}. +requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], + beta_limit(Q3), + q4_funs(MsgPropsFun), State), + {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, + delta_limit(Delta), + q3_funs(MsgPropsFun), State1), + {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, + MsgPropsFun, State2), + MsgCount = length(MsgIds2), + {MsgIds2, a(reduce_memory_use( + State3 #vqstate { delta = Delta1, + q3 = Q3a, + q4 = Q4a, + in_counter = InCounter + MsgCount, + len = Len + MsgCount }))}. len(#vqstate { len = Len }) -> Len. @@ -1314,53 +1331,6 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -requeue_merge(SeqIdsSorted, MsgPropsFun, - #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> - {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, [], - beta_limit(Q3), - q4_funs(MsgPropsFun), State), - {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, MsgIds, - delta_limit(Delta), - q3_funs(MsgPropsFun), State1), - {Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1, - MsgPropsFun, State2), - MsgCount = length(MsgIds2), - {MsgIds2, State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount }}. - -%% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, - State) -> - queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State). - -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, - #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, - State) - when Limit == undefined orelse SeqId < Limit -> - case QOut(Q) of - {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} - when SeqIdQ < SeqId -> - %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, - Limit, Funs, State); - {_, _Q1} -> - %% enqueue from the remaining list of sequence ids - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - QPublish(SeqId, State), - queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds], - Limit, Funs, State1) - end; -queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, - State) -> - {SeqIds, QJoin(Front, Q), MsgIds, State}. - q4_funs(MsgPropsFun) -> #merge_funs { new = fun queue:new/0, @@ -1414,6 +1384,31 @@ q3_funs(MsgPropsFun) -> one_if(not IndexOnDisk) }} end}. +%% Rebuild queue, inserting sequence ids to maintain ordering +queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, + State) -> + queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State). + +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, + #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, + State) + when Limit == undefined orelse SeqId < Limit -> + case QOut(Q) of + {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} + when SeqIdQ < SeqId -> + %% enqueue from the remaining queue + queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, + Limit, Funs, State); + {_, _Q1} -> + %% enqueue from the remaining list of sequence ids + {#msg_status { msg_id = MsgId } = MsgStatus, State1} = + QPublish(SeqId, State), + queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds], + Limit, Funs, State1) + end; +queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, + State) -> + {SeqIds, QJoin(Front, Q), MsgIds, State}. delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; |