diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-02 21:06:02 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-02 21:06:02 +0100 |
commit | 71ecea42652e71feb29ce6870dfa332d9d07c0bd (patch) | |
tree | c34c8bf49d4ac607b15fac275dcce3b3195205cd | |
parent | adbdc99d048620fd355f99750087b1668c8c1ff4 (diff) | |
download | rabbitmq-server-71ecea42652e71feb29ce6870dfa332d9d07c0bd.tar.gz |
refactor: get rid of #merge_funs
-rw-r--r-- | src/rabbit_variable_queue.erl | 83 |
1 files changed, 29 insertions, 54 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1d6a7cf1..482c3d2b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -280,8 +280,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(merge_funs, {new, join, out, in, publish}). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -594,11 +592,11 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), - alpha_funs(), + fun publish_alpha/2, MsgPropsFun, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - beta_funs(), + fun publish_beta/2, MsgPropsFun, State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, MsgPropsFun, State2), @@ -1327,69 +1325,46 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -alpha_funs() -> - #merge_funs { - new = fun ?QUEUE:new/0, - join = fun ?QUEUE:join/2, - out = fun ?QUEUE:out/1, - in = fun ?QUEUE:in/2, - publish = fun (#msg_status { msg = undefined } = MsgStatus, State) -> - read_msg(MsgStatus, State); - (MsgStatus, #vqstate { - ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { - ram_msg_count = RamMsgCount + 1 }} - end}. - -beta_funs() -> - #merge_funs { - new = fun ?QUEUE:new/0, - join = fun ?QUEUE:join/2, - out = fun ?QUEUE:out/1, - in = fun ?QUEUE:in/2, - publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, - State) -> - {#msg_status { index_on_disk = IndexOnDisk, - msg = Msg} = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = - State1} = - maybe_write_to_disk(not MsgOnDisk, false, - MsgStatus, State), - {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + - one_if(Msg =/= undefined), - ram_index_count = RamIndexCount + - one_if(not IndexOnDisk) }} - end}. +publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> + read_msg(MsgStatus, State); +publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> + {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + +publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) -> + {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), + {MsgStatus1, State1 #vqstate { + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, - MsgPropsFun, State) -> - queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State). +queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> + queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, + Limit, PubFun, MsgPropsFun, State). -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, - #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, - MsgPropsFun, State) +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, + Limit, PubFun, MsgPropsFun, State) when Limit == undefined orelse SeqId < Limit -> - case QOut(Q) of + case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, - Limit, Funs, MsgPropsFun, State); + queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, + Limit, PubFun, MsgPropsFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State), {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - QPublish(MsgStatus, State1), - queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds], - Limit, Funs, MsgPropsFun, State2) + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, MsgPropsFun, State2) end; -queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, - _MsgPropsFun, State) -> - {SeqIds, QJoin(Front, Q), MsgIds, State}. +queue_merge(SeqIds, Q, Front, MsgIds, + _Limit, _PubFun, _MsgPropsFun, State) -> + {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; |