summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-03-03 12:50:40 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-03-03 12:50:40 +0000
commit059e6cbc28d6865d14b50ea37397ece38d0c7aec (patch)
treefd4325a974f54be00e52bd858c714283306b46fb
parent3a3a22768e6002098ca99f5feb6429f9d9e764f5 (diff)
downloadrabbitmq-server-059e6cbc28d6865d14b50ea37397ece38d0c7aec.tar.gz
limit alpha->beta conversion through msg store credit flow
we also bump the IO_BATCH_SIZE; it's only used to limit beta->gamma conversion, which is a fairly cheap operation (it only touches qi, which has a very compact on-disk representation), and we want to make sure that such conversion can keep up with the alpha->beta conversion.
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_variable_queue.erl37
2 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ec1a977d..9d240a3f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1235,7 +1235,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- noreply(State);
+ noreply(backing_queue_timeout(State));
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73e9f6b5..a84f2e38 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -305,7 +305,7 @@
%% all. This is both a minimum and a maximum - we don't write fewer
%% than IO_BATCH_SIZE indices out in one go, and we don't write more -
%% we can always come back on the next publish to do more.
--define(IO_BATCH_SIZE, 64).
+-define(IO_BATCH_SIZE, 1024).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
@@ -1601,8 +1601,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, State1}
+ S2 when S2 >= ?IO_BATCH_SIZE ->
+ {true, BetaDeltaFun(?IO_BATCH_SIZE, State1)};
+ _ ->
+ {Reduce, State1}
end.
limit_ram_acks(0, State) ->
@@ -1648,7 +1650,7 @@ chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
0;
chunk_size(Current, Permitted) ->
- lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
+ Current - Permitted.
fetch_from_q3(State = #vqstate { q1 = Q1,
q2 = Q2,
@@ -1754,17 +1756,22 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
TargetRamCount >= RamMsgCount ->
{Quota, State};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
- case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
- push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
- Consumer(MsgStatus2, Qa, State2))
+ case credit_flow:blocked() of
+ true -> {Quota, State};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, State};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = Consumer(MsgStatus2, Qa,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1 }),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State2)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,