summaryrefslogtreecommitdiff
path: root/src/bpqueue.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-13 23:46:08 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-13 23:46:08 +0000
commitc81bbf53513c34ce8be4f6e8308102c46955f928 (patch)
treed26b8260df9fb5b11e153fcef8c249d80975bcf4 /src/bpqueue.erl
parent263192acc15329753dc539a2c5b57278a7df6cda (diff)
downloadrabbitmq-server-c81bbf53513c34ce8be4f6e8308102c46955f928.tar.gz
Much better. The reason why batching is important is because if you're walking through the bpqueue and doing very little work before stopping then you don't really get the amortised constant time behaviour. But the goal is achieved - throughput is maintained and very slowly diminishes with no major interruptions and the queue gets fuller and the transition is made to betas and then deltas.
Diffstat (limited to 'src/bpqueue.erl')
-rw-r--r--src/bpqueue.erl164
1 files changed, 127 insertions, 37 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 7237473f..a556ec23 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -63,12 +63,12 @@
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
-spec(map_fold_filter_l/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-spec(map_fold_filter_r/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-endif.
@@ -107,6 +107,40 @@ in_r(Prefix, Value, {N, Q}) ->
queue:in_r({Prefix, queue:in(Value, queue:new())}, Q)
end}.
+in_q(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out_r(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in({Prefix, queue:join(InnerQ, Queue)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in({Prefix, Queue}, Q)
+ end}
+ end.
+
+in_q_r(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q_r(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in_r({Prefix, queue:join(Queue, InnerQ)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in_r({Prefix, Queue}, Q)
+ end}
+ end.
+
out({0, _Q} = BPQ) ->
{empty, BPQ};
out({N, Q}) ->
@@ -195,7 +229,7 @@ to_list1({Prefix, InnerQ}) ->
%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init}
+%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
%%
%% The filter fun allows you to skip very quickly over blocks that
%% you're not interested in. Such blocks appear in the resulting bpq
@@ -204,50 +238,106 @@ to_list1({Prefix, InnerQ}) ->
%% value, and also to modify the Init/Acc (just like a fold).
map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_l1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_l1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_l1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldl(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(QNew, from_list([{Prefix, InnerList}]))}
- end,
- map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_l2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join(QNew1, {Len - len(QNew1), Q1}), Init1};
+ true ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init, Q1, in_q(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_l2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in(Value1, InnerQNew));
+ false ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.
map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_r1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_r1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_r1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out_r(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldr(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in_r(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(from_list([{Prefix, InnerList}]), QNew)}
- end,
- map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_r2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join({Len - len(QNew1), Q1}, QNew1), Init1};
+ true ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init, Q1, in_q_r(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_r2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out_r(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in_r(Value1, InnerQNew));
+ false ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q_r(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.