diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-03 12:50:40 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-03 12:50:40 +0000 |
commit | 059e6cbc28d6865d14b50ea37397ece38d0c7aec (patch) | |
tree | fd4325a974f54be00e52bd858c714283306b46fb | |
parent | 3a3a22768e6002098ca99f5feb6429f9d9e764f5 (diff) | |
download | rabbitmq-server-059e6cbc28d6865d14b50ea37397ece38d0c7aec.tar.gz |
limit alpha->beta conversion through msg store credit flow
we also bump the IO_BATCH_SIZE; it's only used to limit beta->gamma
conversion, which is a fairly cheap operation (it only touches qi,
which has a very compact on-disk representation), and we want to make
sure that such conversion can keep up with the alpha->beta conversion.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 37 |
2 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ec1a977d..9d240a3f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1235,7 +1235,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - noreply(State); + noreply(backing_queue_timeout(State)); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73e9f6b5..a84f2e38 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -305,7 +305,7 @@ %% all. This is both a minimum and a maximum - we don't write fewer %% than IO_BATCH_SIZE indices out in one go, and we don't write more - %% we can always come back on the next publish to do more. --define(IO_BATCH_SIZE, 64). +-define(IO_BATCH_SIZE, 1024). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). @@ -1601,8 +1601,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, State1} + S2 when S2 >= ?IO_BATCH_SIZE -> + {true, BetaDeltaFun(?IO_BATCH_SIZE, State1)}; + _ -> + {Reduce, State1} end. limit_ram_acks(0, State) -> @@ -1648,7 +1650,7 @@ chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> - lists:min([Current - Permitted, ?IO_BATCH_SIZE]). + Current - Permitted. fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, @@ -1754,17 +1756,22 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, TargetRamCount >= RamMsgCount -> {Quota, State}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> - case Generator(Q) of - {empty, _Q} -> - {Quota, State}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = - maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, - push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, - Consumer(MsgStatus2, Qa, State2)) + case credit_flow:blocked() of + true -> {Quota, State}; + false -> case Generator(Q) of + {empty, _Q} -> + {Quota, State}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = + maybe_write_to_disk(true, false, MsgStatus, State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = Consumer(MsgStatus2, Qa, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1 }), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State2) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |