diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:55:52 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:55:52 +0100 |
commit | 500fdc0d25f431aac42a9ef179ec4b5653c2167d (patch) | |
tree | 9ace02cc9220dfeba36413cd67f513c8ee300315 | |
parent | 1da0d08db25ac9438397fe1f957f3a5b43c2d195 (diff) | |
download | rabbitmq-server-500fdc0d25f431aac42a9ef179ec4b5653c2167d.tar.gz |
refactor: pull call to msg_from_pending_ack into queue_merge
thus avoiding duplication, making the alpha/beta fun tables
non-parametric, and containing knowledge of seq ids in queue_merge.
-rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4b6b0811..73488832 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -576,14 +576,16 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, q4 = Q4, in_counter = InCounter, len = Len } = State) -> - {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], - beta_limit(Q3), - q4_funs(MsgPropsFun), State), + {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], + beta_limit(Q3), + alpha_funs(), + MsgPropsFun, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - q3_funs(MsgPropsFun), State1), - {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - MsgPropsFun, State2), + beta_funs(), + MsgPropsFun, State1), + {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, + MsgPropsFun, State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( State3 #vqstate { delta = Delta1, @@ -1331,27 +1333,21 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -q4_funs(MsgPropsFun) -> +alpha_funs() -> #merge_funs { new = fun queue:new/0, join = fun queue:join/2, out = fun queue:out/1, in = fun queue:in/2, - publish = fun (SeqId, State) -> - {#msg_status { msg = Msg } = MsgStatus, - #vqstate { ram_msg_count = RamMsgCount } = State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State), - case Msg of - undefined -> - read_msg(MsgStatus, State1); - #basic_message{} -> - {MsgStatus, State1 #vqstate { - ram_msg_count = - RamMsgCount + 1 }} - end + publish = fun (#msg_status { msg = undefined } = MsgStatus, State) -> + read_msg(MsgStatus, State); + (MsgStatus, #vqstate { + ram_msg_count = RamMsgCount } = State) -> + {MsgStatus, State #vqstate { + ram_msg_count = RamMsgCount + 1 }} end}. -q3_funs(MsgPropsFun) -> +beta_funs() -> #merge_funs { new = fun bpqueue:new/0, join = fun bpqueue:join/2, @@ -1366,18 +1362,16 @@ q3_funs(MsgPropsFun) -> in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> bpqueue:in(IOD, MsgStatus, Q) end, - publish = fun (SeqId, State) -> - {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, - State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State), + publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, + State) -> {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1, #vqstate { ram_index_count = RamIndexCount, ram_msg_count = RamMsgCount } = - State2} = + State1} = maybe_write_to_disk(not MsgOnDisk, false, - MsgStatus, State1), - {MsgStatus1, State2 #vqstate { + MsgStatus, State), + {MsgStatus1, State1 #vqstate { ram_msg_count = RamMsgCount + one_if(Msg =/= undefined), ram_index_count = RamIndexCount + @@ -1386,28 +1380,30 @@ q3_funs(MsgPropsFun) -> %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, - State) -> - queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State). + MsgPropsFun, State) -> + queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State). queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, - State) + MsgPropsFun, State) when Limit == undefined orelse SeqId < Limit -> case QOut(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, - Limit, Funs, State); + Limit, Funs, MsgPropsFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - QPublish(SeqId, State), - queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds], - Limit, Funs, State1) + {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, + State), + {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = + QPublish(MsgStatus, State1), + queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds], + Limit, Funs, MsgPropsFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, - State) -> + _MsgPropsFun, State) -> {SeqIds, QJoin(Front, Q), MsgIds, State}. delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> |