summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-02 21:06:02 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-02 21:06:02 +0100
commit71ecea42652e71feb29ce6870dfa332d9d07c0bd (patch)
treec34c8bf49d4ac607b15fac275dcce3b3195205cd
parentadbdc99d048620fd355f99750087b1668c8c1ff4 (diff)
downloadrabbitmq-server-71ecea42652e71feb29ce6870dfa332d9d07c0bd.tar.gz
refactor: get rid of #merge_funs
-rw-r--r--src/rabbit_variable_queue.erl83
1 files changed, 29 insertions, 54 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1d6a7cf1..482c3d2b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -280,8 +280,6 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(merge_funs, {new, join, out, in, publish}).
-
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -594,11 +592,11 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
- alpha_funs(),
+ fun publish_alpha/2,
MsgPropsFun, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- beta_funs(),
+ fun publish_beta/2,
MsgPropsFun, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
MsgPropsFun, State2),
@@ -1327,69 +1325,46 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-alpha_funs() ->
- #merge_funs {
- new = fun ?QUEUE:new/0,
- join = fun ?QUEUE:join/2,
- out = fun ?QUEUE:out/1,
- in = fun ?QUEUE:in/2,
- publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
- (MsgStatus, #vqstate {
- ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate {
- ram_msg_count = RamMsgCount + 1 }}
- end}.
-
-beta_funs() ->
- #merge_funs {
- new = fun ?QUEUE:new/0,
- join = fun ?QUEUE:join/2,
- out = fun ?QUEUE:out/1,
- in = fun ?QUEUE:in/2,
- publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
- State) ->
- {#msg_status { index_on_disk = IndexOnDisk,
- msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } =
- State1} =
- maybe_write_to_disk(not MsgOnDisk, false,
- MsgStatus, State),
- {MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount +
- one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount +
- one_if(not IndexOnDisk) }}
- end}.
+publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+
+publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) ->
+ {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
- MsgPropsFun, State) ->
- queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
+ queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
+ Limit, PubFun, MsgPropsFun, State).
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
- #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
- MsgPropsFun, State)
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
+ Limit, PubFun, MsgPropsFun, State)
when Limit == undefined orelse SeqId < Limit ->
- case QOut(Q) of
+ case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
- Limit, Funs, MsgPropsFun, State);
+ queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
+ Limit, PubFun, MsgPropsFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
{MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- QPublish(MsgStatus, State1),
- queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, Funs, MsgPropsFun, State2)
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, MsgPropsFun, State2)
end;
-queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
- _MsgPropsFun, State) ->
- {SeqIds, QJoin(Front, Q), MsgIds, State}.
+queue_merge(SeqIds, Q, Front, MsgIds,
+ _Limit, _PubFun, _MsgPropsFun, State) ->
+ {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};