summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-13 23:46:08 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-13 23:46:08 +0000
commitc81bbf53513c34ce8be4f6e8308102c46955f928 (patch)
treed26b8260df9fb5b11e153fcef8c249d80975bcf4 /src
parent263192acc15329753dc539a2c5b57278a7df6cda (diff)
downloadrabbitmq-server-c81bbf53513c34ce8be4f6e8308102c46955f928.tar.gz
Much better. The reason why batching is important is because if you're walking through the bpqueue and doing very little work before stopping then you don't really get the amortised constant time behaviour. But the goal is achieved - throughput is maintained and very slowly diminishes with no major interruptions and the queue gets fuller and the transition is made to betas and then deltas.
Diffstat (limited to 'src')
-rw-r--r--src/bpqueue.erl164
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl89
3 files changed, 192 insertions, 75 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 7237473f..a556ec23 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -63,12 +63,12 @@
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
-spec(map_fold_filter_l/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-spec(map_fold_filter_r/4 ::
(fun ((prefix()) -> boolean()),
- fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) ->
- {bpqueue(), B}).
+ fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
+ bpqueue()) -> {bpqueue(), B}).
-endif.
@@ -107,6 +107,40 @@ in_r(Prefix, Value, {N, Q}) ->
queue:in_r({Prefix, queue:in(Value, queue:new())}, Q)
end}.
+in_q(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out_r(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in({Prefix, queue:join(InnerQ, Queue)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in({Prefix, Queue}, Q)
+ end}
+ end.
+
+in_q_r(Prefix, Queue, BPQ = {0, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ N -> {N, queue:in({Prefix, Queue}, Q)}
+ end;
+in_q_r(Prefix, Queue, BPQ = {N, Q}) ->
+ case queue:len(Queue) of
+ 0 -> BPQ;
+ M -> {N + M,
+ case queue:out(Q) of
+ {{value, {Prefix, InnerQ}}, Q1} ->
+ queue:in_r({Prefix, queue:join(Queue, InnerQ)}, Q1);
+ {{value, {_Prefix, _InnerQ}}, _Q1} ->
+ queue:in_r({Prefix, Queue}, Q)
+ end}
+ end.
+
out({0, _Q} = BPQ) ->
{empty, BPQ};
out({N, Q}) ->
@@ -195,7 +229,7 @@ to_list1({Prefix, InnerQ}) ->
%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init}
%% where FilterFun(Prefix) -> boolean()
-%% Fun(Value, Init) -> {Prefix, Value, Init}
+%% Fun(Value, Init) -> {Prefix, Value, Init} | stop
%%
%% The filter fun allows you to skip very quickly over blocks that
%% you're not interested in. Such blocks appear in the resulting bpq
@@ -204,50 +238,106 @@ to_list1({Prefix, InnerQ}) ->
%% value, and also to modify the Init/Acc (just like a fold).
map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_l1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_l1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_l1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldl(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(QNew, from_list([{Prefix, InnerList}]))}
- end,
- map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_l2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join(QNew1, {Len - len(QNew1), Q1}), Init1};
+ true ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_l1(
+ Len, PFilter, Fun, Init, Q1, in_q(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_l2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q(OrigPrefix, InnerQ,
+ in_q(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in(Value1, InnerQNew));
+ false ->
+ map_fold_filter_l2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.
map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
-map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) ->
- map_fold_filter_r1(PFilter, Fun, Init, Q, new()).
+map_fold_filter_r(PFilter, Fun, Init, {N, Q}) ->
+ map_fold_filter_r1(N, PFilter, Fun, Init, Q, new()).
-map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) ->
+map_fold_filter_r1(Len, PFilter, Fun, Init, Q, QNew) ->
case queue:out_r(Q) of
{empty, _Q} ->
{QNew, Init};
{{value, {Prefix, InnerQ}}, Q1} ->
- InnerList = queue:to_list(InnerQ),
- {Init1, QNew1} =
- case PFilter(Prefix) of
- true ->
- lists:foldr(
- fun (Value, {Acc, QNew2}) ->
- {Prefix1, Value1, Acc1} = Fun(Value, Acc),
- {Acc1, in_r(Prefix1, Value1, QNew2)}
- end, {Init, QNew}, InnerList);
- false ->
- {Init, join(from_list([{Prefix, InnerList}]), QNew)}
- end,
- map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1)
+ case PFilter(Prefix) of
+ true ->
+ {Init1, QNew1, Cont} =
+ map_fold_filter_r2(
+ Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()),
+ case Cont of
+ false ->
+ {join({Len - len(QNew1), Q1}, QNew1), Init1};
+ true ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init1, Q1, QNew1)
+ end;
+ false ->
+ map_fold_filter_r1(
+ Len, PFilter, Fun, Init, Q1, in_q_r(Prefix, InnerQ, QNew))
+ end
+ end.
+
+map_fold_filter_r2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) ->
+ case queue:out_r(InnerQ) of
+ {empty, _Q} ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), true};
+ {{value, Value}, InnerQ1} ->
+ case Fun(Value, Init) of
+ stop ->
+ {Init, in_q_r(OrigPrefix, InnerQ,
+ in_q_r(Prefix, InnerQNew, QNew)), false};
+ {Prefix1, Value1, Init1} ->
+ case Prefix1 =:= Prefix of
+ true ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew,
+ queue:in_r(Value1, InnerQNew));
+ false ->
+ map_fold_filter_r2(
+ Fun, OrigPrefix, Prefix1, Init1, InnerQ1,
+ in_q_r(Prefix, InnerQNew, QNew),
+ queue:in(Value1, queue:new()))
+ end
+ end
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 45b48017..291f4cb0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -239,7 +239,8 @@ test_bpqueue() ->
fun (foo) -> true;
(_) -> false
end,
- fun (V, Num) -> {bar, -V, V - Num} end,
+ fun (2, _Num) -> stop;
+ (V, Num) -> {bar, -V, V - Num} end,
0, Qn)
end,
@@ -248,14 +249,15 @@ test_bpqueue() ->
fun (foo) -> true;
(_) -> false
end,
- fun (V, Num) -> {bar, -V, V - Num} end,
+ fun (2, _Num) -> stop;
+ (V, Num) -> {bar, -V, V - Num} end,
0, Qn)
end,
- {Q9, 1} = F1(Q1), %% 2 - (1 - 0) == 1
- [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q9),
- {Q10, -1} = F2(Q1), %% 1 - (2 - 0) == -1
- [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q10),
+ {Q9, 1} = F1(Q1),
+ [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = bpqueue:to_list(Q9),
+ {Q10, 0} = F2(Q1),
+ [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(Q10),
{Q11, 0} = F1(Q),
[] = bpqueue:to_list(Q11),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6c5efbd5..a62a90ce 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -78,8 +78,20 @@
index_on_disk
}).
--define(RAM_INDEX_TARGET_RATIO, 256).
--define(RAM_INDEX_MAX_WORK, 32).
+%% If there are N msgs in the q, and M of them are betas, then it is
+%% required that RAM_INDEX_BETA_RATIO * (M/N) * M of those have their
+%% index on disk. Eg if RAM_INDEX_BETA_RATIO is 1.0, and there are 36
+%% msgs in the queue, of which 12 are betas, then 4 of those betas
+%% must have their index on disk.
+-define(RAM_INDEX_BETA_RATIO, 0.8).
+%% When we discover, on publish, that we should write some indices to
+%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
+%% betas that we must be due to write indices for before we do any
+%% work at all. This is both a minimum and a maximum - we don't write
+%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
+%% write more - we can always come back on the next publish to do
+%% more.
+-define(RAM_INDEX_BATCH_SIZE, 1024).
%%----------------------------------------------------------------------------
@@ -577,6 +589,26 @@ beta_fold_no_index_on_disk(Fun, Init, Q) ->
Fun(Value, Acc)
end, Init, Q).
+permitted_ram_index_count(#vqstate { len = 0 }) ->
+ undefined;
+permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3 }) ->
+ case bpqueue:len(Q2) + bpqueue:len(Q3) of
+ 0 ->
+ undefined;
+ BetaLength ->
+ %% the fraction of the queue that are betas
+ BetaFrac = BetaLength / Len,
+ BetaLength - trunc(BetaFrac * BetaLength * ?RAM_INDEX_BETA_RATIO)
+ end.
+
+
+should_force_index_to_disk(State =
+ #vqstate { ram_index_count = RamIndexCount }) ->
+ case permitted_ram_index_count(State) of
+ undefined -> false;
+ Permitted -> RamIndexCount >= Permitted
+ end.
+
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -771,17 +803,10 @@ publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
publish(index, MsgStatus, State =
#vqstate { index_state = IndexState, q1 = Q1,
- ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount }) ->
+ ram_index_count = RamIndexCount }) ->
MsgStatus1 = #msg_status { msg_on_disk = true } =
maybe_write_msg_to_disk(true, MsgStatus),
- ForceIndex = case TargetRamMsgCount of
- undefined ->
- false;
- _ ->
- RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)
- end,
+ ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of
@@ -872,17 +897,24 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
%% Phase changes
%%----------------------------------------------------------------------------
-limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount })
- when RamIndexCount > ?RAM_INDEX_TARGET_RATIO * TargetRamMsgCount ->
- Reduction = lists:min([?RAM_INDEX_MAX_WORK,
- RamIndexCount - (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)]),
- {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
- {_Reduction2, State2} = limit_q3_ram_index(Reduction1, State1),
- State2;
-limit_ram_index(State) ->
- State.
+limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
+ case permitted_ram_index_count(State) of
+ undefined ->
+ State;
+ Permitted when RamIndexCount > Permitted ->
+ Reduction = lists:min([RamIndexCount - Permitted,
+ ?RAM_INDEX_BATCH_SIZE]),
+ case Reduction < ?RAM_INDEX_BATCH_SIZE of
+ true ->
+ State;
+ false ->
+ {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
+ {_Red2, State2} = limit_q3_ram_index(Reduction1, State1),
+ State2
+ end;
+ _ ->
+ State
+ end.
limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 })
when Reduction > 0 ->
@@ -908,9 +940,9 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
{Qa, {Reduction1, IndexState1}} =
MapFoldFilterFun(
fun erlang:'not'/1,
- fun (MsgStatus, {0, _IndexStateN} = Acc) ->
+ fun (MsgStatus, {0, _IndexStateN}) ->
false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- {false, MsgStatus, Acc};
+ stop;
(MsgStatus, {N, IndexStateN}) when N > 0 ->
false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
{MsgStatus1, IndexStateN1} =
@@ -986,19 +1018,12 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- target_ram_msg_count = TargetRamMsgCount,
index_state = IndexState }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus),
- ForceIndex = case TargetRamMsgCount of
- undefined ->
- false;
- _ ->
- RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
- TargetRamMsgCount)
- end,
+ ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of