summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:09:16 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-29 21:09:16 +0100
commit016416611c55123e4f3d0a8377858b236a2694a0 (patch)
treec68e9c121b5f042bc03f4899a16226dd9e30746e
parent6ffbb36794fe101eaef101127645758ab29b4c44 (diff)
downloadrabbitmq-server-016416611c55123e4f3d0a8377858b236a2694a0.tar.gz
further abstraction of queue/bpqueue distinction
-rw-r--r--src/rabbit_variable_queue.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a624ea33..ad47a7df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -281,7 +281,7 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(merge_funs, {join, out, in, publish}).
+-record(merge_funs, {new, join, out, in, publish}).
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
@@ -1320,12 +1320,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
- {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4,
- queue:new(), [],
+ {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, [],
beta_limit(Q3),
q4_funs(MsgPropsFun), State),
- {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3,
- bpqueue:new(), MsgIds,
+ {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, MsgIds,
delta_limit(Delta),
q3_funs(MsgPropsFun), State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1,
@@ -1338,6 +1336,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
len = Len + MsgCount }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
+ State) ->
+ queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
+
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
#merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
State)
@@ -1361,6 +1363,7 @@ queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
q4_funs(MsgPropsFun) ->
#merge_funs {
+ new = fun queue:new/0,
join = fun queue:join/2,
out = fun queue:out/1,
in = fun queue:in/2,
@@ -1380,6 +1383,7 @@ q4_funs(MsgPropsFun) ->
q3_funs(MsgPropsFun) ->
#merge_funs {
+ new = fun bpqueue:new/0,
join = fun bpqueue:join/2,
out = fun (Q) ->
case bpqueue:out(Q) of