diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:09:16 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-29 21:09:16 +0100 |
commit | 016416611c55123e4f3d0a8377858b236a2694a0 (patch) | |
tree | c68e9c121b5f042bc03f4899a16226dd9e30746e | |
parent | 6ffbb36794fe101eaef101127645758ab29b4c44 (diff) | |
download | rabbitmq-server-016416611c55123e4f3d0a8377858b236a2694a0.tar.gz |
further abstraction of queue/bpqueue distinction
-rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a624ea33..ad47a7df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -281,7 +281,7 @@ end_seq_id %% end_seq_id is exclusive }). --record(merge_funs, {join, out, in, publish}). +-record(merge_funs, {new, join, out, in, publish}). %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas @@ -1320,12 +1320,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, q4 = Q4, in_counter = InCounter, len = Len } = State) -> - {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, - queue:new(), [], + {SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, [], beta_limit(Q3), q4_funs(MsgPropsFun), State), - {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, - bpqueue:new(), MsgIds, + {SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, MsgIds, delta_limit(Delta), q3_funs(MsgPropsFun), State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1, @@ -1338,6 +1336,10 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, len = Len + MsgCount }}. %% Rebuild queue, inserting sequence ids to maintain ordering +queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, + State) -> + queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State). + queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, State) @@ -1361,6 +1363,7 @@ queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, q4_funs(MsgPropsFun) -> #merge_funs { + new = fun queue:new/0, join = fun queue:join/2, out = fun queue:out/1, in = fun queue:in/2, @@ -1380,6 +1383,7 @@ q4_funs(MsgPropsFun) -> q3_funs(MsgPropsFun) -> #merge_funs { + new = fun bpqueue:new/0, join = fun bpqueue:join/2, out = fun (Q) -> case bpqueue:out(Q) of |