summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:55:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:55:52 +0100
commit500fdc0d25f431aac42a9ef179ec4b5653c2167d (patch)
tree9ace02cc9220dfeba36413cd67f513c8ee300315
parent1da0d08db25ac9438397fe1f957f3a5b43c2d195 (diff)
downloadrabbitmq-server-500fdc0d25f431aac42a9ef179ec4b5653c2167d.tar.gz
refactor: pull call to msg_from_pending_ack into queue_merge
thus avoiding duplication, making the alpha/beta fun tables non-parametric, and containing knowledge of seq ids in queue_merge.
-rw-r--r--src/rabbit_variable_queue.erl68
1 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4b6b0811..73488832 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -576,14 +576,16 @@ requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
- {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
- beta_limit(Q3),
- q4_funs(MsgPropsFun), State),
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ alpha_funs(),
+ MsgPropsFun, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- q3_funs(MsgPropsFun), State1),
- {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- MsgPropsFun, State2),
+ beta_funs(),
+ MsgPropsFun, State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ MsgPropsFun, State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
State3 #vqstate { delta = Delta1,
@@ -1331,27 +1333,21 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-q4_funs(MsgPropsFun) ->
+alpha_funs() ->
#merge_funs {
new = fun queue:new/0,
join = fun queue:join/2,
out = fun queue:out/1,
in = fun queue:in/2,
- publish = fun (SeqId, State) ->
- {#msg_status { msg = Msg } = MsgStatus,
- #vqstate { ram_msg_count = RamMsgCount } = State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State),
- case Msg of
- undefined ->
- read_msg(MsgStatus, State1);
- #basic_message{} ->
- {MsgStatus, State1 #vqstate {
- ram_msg_count =
- RamMsgCount + 1 }}
- end
+ publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+ (MsgStatus, #vqstate {
+ ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate {
+ ram_msg_count = RamMsgCount + 1 }}
end}.
-q3_funs(MsgPropsFun) ->
+beta_funs() ->
#merge_funs {
new = fun bpqueue:new/0,
join = fun bpqueue:join/2,
@@ -1366,18 +1362,16 @@ q3_funs(MsgPropsFun) ->
in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
bpqueue:in(IOD, MsgStatus, Q)
end,
- publish = fun (SeqId, State) ->
- {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
- State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
+ State) ->
{#msg_status { index_on_disk = IndexOnDisk,
msg = Msg} = MsgStatus1,
#vqstate { ram_index_count = RamIndexCount,
ram_msg_count = RamMsgCount } =
- State2} =
+ State1} =
maybe_write_to_disk(not MsgOnDisk, false,
- MsgStatus, State1),
- {MsgStatus1, State2 #vqstate {
+ MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
ram_msg_count = RamMsgCount +
one_if(Msg =/= undefined),
ram_index_count = RamIndexCount +
@@ -1386,28 +1380,30 @@ q3_funs(MsgPropsFun) ->
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
- State) ->
- queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
+ MsgPropsFun, State) ->
+ queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
#merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
- State)
+ MsgPropsFun, State)
when Limit == undefined orelse SeqId < Limit ->
case QOut(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
- Limit, Funs, State);
+ Limit, Funs, MsgPropsFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- QPublish(SeqId, State),
- queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
- Limit, Funs, State1)
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
+ State),
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ QPublish(MsgStatus, State1),
+ queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, Funs, MsgPropsFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
- State) ->
+ _MsgPropsFun, State) ->
{SeqIds, QJoin(Front, Q), MsgIds, State}.
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->