diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-01-13 23:46:08 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-01-13 23:46:08 +0000 |
commit | c81bbf53513c34ce8be4f6e8308102c46955f928 (patch) | |
tree | d26b8260df9fb5b11e153fcef8c249d80975bcf4 /src/bpqueue.erl | |
parent | 263192acc15329753dc539a2c5b57278a7df6cda (diff) | |
download | rabbitmq-server-c81bbf53513c34ce8be4f6e8308102c46955f928.tar.gz |
Much better. The reason why batching is important is because if you're walking through the bpqueue and doing very little work before stopping then you don't really get the amortised constant time behaviour. But the goal is achieved - throughput is maintained and very slowly diminishes with no major interruptions and the queue gets fuller and the transition is made to betas and then deltas.
Diffstat (limited to 'src/bpqueue.erl')
-rw-r--r-- | src/bpqueue.erl | 164 |
1 files changed, 127 insertions, 37 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl index 7237473f..a556ec23 100644 --- a/src/bpqueue.erl +++ b/src/bpqueue.erl @@ -63,12 +63,12 @@ -spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). -spec(map_fold_filter_l/4 :: (fun ((prefix()) -> boolean()), - fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> - {bpqueue(), B}). + fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, + bpqueue()) -> {bpqueue(), B}). -spec(map_fold_filter_r/4 :: (fun ((prefix()) -> boolean()), - fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> - {bpqueue(), B}). + fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, + bpqueue()) -> {bpqueue(), B}). -endif. @@ -107,6 +107,40 @@ in_r(Prefix, Value, {N, Q}) -> queue:in_r({Prefix, queue:in(Value, queue:new())}, Q) end}. +in_q(Prefix, Queue, BPQ = {0, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + N -> {N, queue:in({Prefix, Queue}, Q)} + end; +in_q(Prefix, Queue, BPQ = {N, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + M -> {N + M, + case queue:out_r(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in({Prefix, queue:join(InnerQ, Queue)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in({Prefix, Queue}, Q) + end} + end. + +in_q_r(Prefix, Queue, BPQ = {0, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + N -> {N, queue:in({Prefix, Queue}, Q)} + end; +in_q_r(Prefix, Queue, BPQ = {N, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + M -> {N + M, + case queue:out(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in_r({Prefix, queue:join(Queue, InnerQ)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in_r({Prefix, Queue}, Q) + end} + end. + out({0, _Q} = BPQ) -> {empty, BPQ}; out({N, Q}) -> @@ -195,7 +229,7 @@ to_list1({Prefix, InnerQ}) -> %% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} %% where FilterFun(Prefix) -> boolean() -%% Fun(Value, Init) -> {Prefix, Value, Init} +%% Fun(Value, Init) -> {Prefix, Value, Init} | stop %% %% The filter fun allows you to skip very quickly over blocks that %% you're not interested in. Such blocks appear in the resulting bpq @@ -204,50 +238,106 @@ to_list1({Prefix, InnerQ}) -> %% value, and also to modify the Init/Acc (just like a fold). map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> {BPQ, Init}; -map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) -> - map_fold_filter_l1(PFilter, Fun, Init, Q, new()). +map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter_l1(N, PFilter, Fun, Init, Q, new()). -map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) -> +map_fold_filter_l1(Len, PFilter, Fun, Init, Q, QNew) -> case queue:out(Q) of {empty, _Q} -> {QNew, Init}; {{value, {Prefix, InnerQ}}, Q1} -> - InnerList = queue:to_list(InnerQ), - {Init1, QNew1} = - case PFilter(Prefix) of - true -> - lists:foldl( - fun (Value, {Acc, QNew2}) -> - {Prefix1, Value1, Acc1} = Fun(Value, Acc), - {Acc1, in(Prefix1, Value1, QNew2)} - end, {Init, QNew}, InnerList); - false -> - {Init, join(QNew, from_list([{Prefix, InnerList}]))} - end, - map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1) + case PFilter(Prefix) of + true -> + {Init1, QNew1, Cont} = + map_fold_filter_l2( + Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()), + case Cont of + false -> + {join(QNew1, {Len - len(QNew1), Q1}), Init1}; + true -> + map_fold_filter_l1( + Len, PFilter, Fun, Init1, Q1, QNew1) + end; + false -> + map_fold_filter_l1( + Len, PFilter, Fun, Init, Q1, in_q(Prefix, InnerQ, QNew)) + end + end. + +map_fold_filter_l2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) -> + case queue:out(InnerQ) of + {empty, _Q} -> + {Init, in_q(OrigPrefix, InnerQ, + in_q(Prefix, InnerQNew, QNew)), true}; + {{value, Value}, InnerQ1} -> + case Fun(Value, Init) of + stop -> + {Init, in_q(OrigPrefix, InnerQ, + in_q(Prefix, InnerQNew, QNew)), false}; + {Prefix1, Value1, Init1} -> + case Prefix1 =:= Prefix of + true -> + map_fold_filter_l2( + Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew, + queue:in(Value1, InnerQNew)); + false -> + map_fold_filter_l2( + Fun, OrigPrefix, Prefix1, Init1, InnerQ1, + in_q(Prefix, InnerQNew, QNew), + queue:in(Value1, queue:new())) + end + end end. map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> {BPQ, Init}; -map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) -> - map_fold_filter_r1(PFilter, Fun, Init, Q, new()). +map_fold_filter_r(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter_r1(N, PFilter, Fun, Init, Q, new()). -map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) -> +map_fold_filter_r1(Len, PFilter, Fun, Init, Q, QNew) -> case queue:out_r(Q) of {empty, _Q} -> {QNew, Init}; {{value, {Prefix, InnerQ}}, Q1} -> - InnerList = queue:to_list(InnerQ), - {Init1, QNew1} = - case PFilter(Prefix) of - true -> - lists:foldr( - fun (Value, {Acc, QNew2}) -> - {Prefix1, Value1, Acc1} = Fun(Value, Acc), - {Acc1, in_r(Prefix1, Value1, QNew2)} - end, {Init, QNew}, InnerList); - false -> - {Init, join(from_list([{Prefix, InnerList}]), QNew)} - end, - map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1) + case PFilter(Prefix) of + true -> + {Init1, QNew1, Cont} = + map_fold_filter_r2( + Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()), + case Cont of + false -> + {join({Len - len(QNew1), Q1}, QNew1), Init1}; + true -> + map_fold_filter_r1( + Len, PFilter, Fun, Init1, Q1, QNew1) + end; + false -> + map_fold_filter_r1( + Len, PFilter, Fun, Init, Q1, in_q_r(Prefix, InnerQ, QNew)) + end + end. + +map_fold_filter_r2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) -> + case queue:out_r(InnerQ) of + {empty, _Q} -> + {Init, in_q_r(OrigPrefix, InnerQ, + in_q_r(Prefix, InnerQNew, QNew)), true}; + {{value, Value}, InnerQ1} -> + case Fun(Value, Init) of + stop -> + {Init, in_q_r(OrigPrefix, InnerQ, + in_q_r(Prefix, InnerQNew, QNew)), false}; + {Prefix1, Value1, Init1} -> + case Prefix1 =:= Prefix of + true -> + map_fold_filter_r2( + Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew, + queue:in_r(Value1, InnerQNew)); + false -> + map_fold_filter_r2( + Fun, OrigPrefix, Prefix1, Init1, InnerQ1, + in_q_r(Prefix, InnerQNew, QNew), + queue:in(Value1, queue:new())) + end + end end. |